pitrou commented on code in PR #13796:
URL: https://github.com/apache/arrow/pull/13796#discussion_r946571962


##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -34,10 +34,24 @@ namespace fs {
 
 /// Options for the LocalFileSystem implementation.
 struct ARROW_EXPORT LocalFileSystemOptions {
+  static constexpr int32_t kDefaultDirectoryReadahead = 1;
+  static constexpr int32_t kDefaultFileInfoBatchSize = 1000;
+
   /// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
   /// or a regular one.
   bool use_mmap = false;
 
+  /// Options related to `GetFileSystemGenerator` interface.

Review Comment:
   ```suggestion
     /// Options related to `GetFileInfoGenerator` interface.
   ```



##########
cpp/src/arrow/filesystem/localfs_benchmark.cc:
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/status.h>
+#include <arrow/testing/random.h>
+#include <arrow/util/async_generator.h>
+#include <arrow/util/string_view.h>
+#include <memory>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/filesystem/localfs.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/formatting.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
+
+#include "parquet/arrow/writer.h"

Review Comment:
   Can we make the benchmark not use Parquet? I think we want to minimize 
coupling here.
   
   Just create a dummy filesystem structure and run your benchmark over that.



##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,240 @@ Result<std::vector<FileInfo>> 
LocalFileSystem::GetFileInfo(const FileSelector& s
   return results;
 }
 
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual 
directories
+///    so that each directory results are yielded as a separate 
`FileInfoGenerator` via
+///    an underlying `DiscoveryImplIterator`, which delivers items in chunks 
(default size
+///    is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+///    `DiscoverDirectoryFiles`, with the difference that the results from 
individual
+///    sub-directory iterators are merged into the single FileInfoGenerator 
stream.
+///
+/// The implementation makes use of additional attributes in 
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are 
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+  using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+  /// Helper class, which wraps the producer instance and ensures
+  /// that it stays "open" (i.e. `producer.is_closed() == false`)
+  /// until all discovery iterators are exhausted.
+  ///
+  /// The calss maintains a shared `State`, which wraps the original producer
+  /// and automatically calls `Close()` on it once the ref-count for the
+  /// state reaches zero (which is equivalent to finishing the file discovery
+  /// process).
+  class AutoClosingProducer {
+   public:
+    explicit AutoClosingProducer(FileInfoGeneratorProducer producer)
+        : state_(std::make_shared<State>(std::move(producer))) {}
+
+    bool Push(arrow::Result<FileInfoGenerator> value) {
+      return state_ && state_->Push(std::move(value));
+    }
+
+    void Finish() { state_.reset(); }
+
+    bool IsFinished() { return !state_; }
+
+   private:
+    struct State {
+      State(FileInfoGeneratorProducer producer) : wrapped(std::move(producer)) 
{}
+
+      ~State() { wrapped.Close(); }
+
+      bool Push(arrow::Result<FileInfoGenerator> value) {
+        return wrapped.Push(std::move(value));
+      }
+
+      FileInfoGeneratorProducer wrapped;
+    };
+
+    std::shared_ptr<State> state_;
+  };
+
+  /// The main procedure to start async streaming discovery using a given 
`FileSelector`.
+  ///
+  /// The result is a two-level generator, i.e. "generator of 
FileInfoGenerator:s",
+  /// where each individual generator represents an FileInfo item stream from 
coming an
+  /// individual sub-directory under the selector's `base_dir`.
+  static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+      FileSelector selector, LocalFileSystemOptions fs_opts) {
+    PushGenerator<FileInfoGenerator> file_gen;
+
+    ARROW_ASSIGN_OR_RAISE(
+        auto base_dir, 
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+    ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0, 
std::move(selector),
+                                    AutoClosingProducer(file_gen.producer()),
+                                    fs_opts.file_info_batch_size));
+
+    return file_gen;
+  }
+
+  /// Version of `DiscoverDirectoryFiles` which flattens the stream of 
generators
+  /// into a single FileInfoGenerator stream.
+  /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine 
how much
+  /// readahead should happen.
+  static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+      FileSelector selector, LocalFileSystemOptions fs_opts) {
+    int32_t dir_readahead = fs_opts.directory_readahead;
+    ARROW_ASSIGN_OR_RAISE(
+        auto part_gen, DiscoverDirectoryFiles(std::move(selector), 
std::move(fs_opts)));
+    return dir_readahead > 1
+               ? MakeSequencedMergedGenerator(std::move(part_gen), 
dir_readahead)
+               : MakeConcatenatedGenerator(std::move(part_gen));
+  }
+
+ private:
+  /// The class, which implements iterator interface to traverse a given
+  /// directory at the fixed nesting depth, and possibly recurses into
+  /// sub-directories (if specified by the selector), spawning more
+  /// `DiscoveryImplIterators`, which feed their data into a single producer.
+  class DiscoveryImplIterator {
+    const PlatformFilename dir_fn_;
+    const int32_t nesting_depth_;
+    const FileSelector selector_;
+    const uint32_t file_info_batch_size_;
+
+    AutoClosingProducer file_gen_producer_;
+    FileInfoVector current_chunk_;
+    std::vector<PlatformFilename> child_fns_;
+    size_t idx_ = 0;
+    bool initialized_ = false;
+
+   public:
+    DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+                          FileSelector selector, AutoClosingProducer 
file_gen_producer,
+                          uint32_t file_info_batch_size)
+        : dir_fn_(std::move(dir_fn)),
+          nesting_depth_(nesting_depth),
+          selector_(std::move(selector)),
+          file_info_batch_size_(file_info_batch_size),
+          file_gen_producer_(std::move(file_gen_producer)) {}
+
+    /// Pre-initialize the iterator by listing directory contents and caching
+    /// in the current instance.
+    Status Initialize() {
+      auto result = arrow::internal::ListDir(dir_fn_);
+      if (!result.ok()) {
+        auto status = result.status();
+        if (selector_.allow_not_found && status.IsIOError()) {
+          ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn_));
+          if (!exists) {
+            return Status::OK();
+          }
+        }
+        return status;
+      }
+      child_fns_ = result.MoveValueUnsafe();
+
+      const size_t dirent_count = child_fns_.size();
+      current_chunk_.reserve(dirent_count >= file_info_batch_size_ ? 
file_info_batch_size_
+                                                                   : 
dirent_count);
+
+      initialized_ = true;
+      return Status::OK();
+    }
+
+    Result<FileInfoVector> Next() {
+      if (!initialized_) {
+        auto init = Initialize();
+        if (!init.ok()) {
+          return Finish(init);
+        }
+      }
+      while (idx_ < child_fns_.size()) {
+        auto full_fn = dir_fn_.Join(child_fns_[idx_++]);
+        auto res = StatFile(full_fn.ToNative());
+        if (!res.ok()) {
+          return Finish(res.status());
+        }
+
+        auto info = res.MoveValueUnsafe();
+
+        // Try to recurse into subdirectories, if needed.
+        if (info.type() == FileType::Directory &&
+            nesting_depth_ < selector_.max_recursion && selector_.recursive) {
+          auto status = DoDiscovery(std::move(full_fn), nesting_depth_ + 1, 
selector_,
+                                    file_gen_producer_, file_info_batch_size_);
+          if (!status.ok()) {
+            return Finish(status);
+          }
+        }
+        // Everything is ok. Add the item to the current chunk of data.
+        current_chunk_.emplace_back(std::move(info));
+        // Keep `current_chunk_` as large, as `batch_size_`.
+        // Otherwise, yield the complete chunk to the caller.
+        if (current_chunk_.size() == file_info_batch_size_) {
+          FileInfoVector yield_vec;
+          std::swap(yield_vec, current_chunk_);
+          const size_t items_left = child_fns_.size() - idx_;
+          current_chunk_.reserve(
+              items_left >= file_info_batch_size_ ? file_info_batch_size_ : 
items_left);
+          return yield_vec;
+        }
+      }  // while (idx_ < child_fns_.size())
+
+      // Flush out remaining items
+      if (!current_chunk_.empty()) {
+        return std::move(current_chunk_);
+      }
+      return Finish();
+    }
+
+   private:
+    /// Close the producer end of stream and return iteration end marker.
+    Result<FileInfoVector> Finish(Status status = Status::OK()) {
+      file_gen_producer_.Finish();
+      ARROW_RETURN_NOT_OK(status);
+      return IterationEnd<FileInfoVector>();
+    }
+  };
+
+  /// Create an instance of  `DiscoveryImplIterator` under the hood for the
+  /// specified directory, wrap it in the `BackgroundGenerator + 
TransferredGenerator`
+  /// bundle and feed the results to the main producer queue.
+  static Status DoDiscovery(const PlatformFilename& dir_fn, int32_t 
nesting_depth,
+                            FileSelector selector, AutoClosingProducer 
file_gen_producer,
+                            int32_t file_info_batch_size) {
+    ARROW_RETURN_IF(file_gen_producer.IsFinished(),
+                    arrow::Status::Cancelled("Discovery cancelled"));
+
+    ARROW_ASSIGN_OR_RAISE(
+        auto gen,
+        MakeBackgroundGenerator(Iterator<FileInfoVector>(DiscoveryImplIterator(
+                                    std::move(dir_fn), nesting_depth, 
std::move(selector),
+                                    file_gen_producer, file_info_batch_size)),
+                                io::default_io_context().executor()));
+    gen = MakeTransferredGenerator(std::move(gen), 
arrow::internal::GetCpuThreadPool());

Review Comment:
   Hmm... why are we transferring to the CPU thread pool? That doesn't seem 
necessary.
   cc @westonpace 



##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,240 @@ Result<std::vector<FileInfo>> 
LocalFileSystem::GetFileInfo(const FileSelector& s
   return results;
 }
 
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual 
directories
+///    so that each directory results are yielded as a separate 
`FileInfoGenerator` via
+///    an underlying `DiscoveryImplIterator`, which delivers items in chunks 
(default size
+///    is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+///    `DiscoverDirectoryFiles`, with the difference that the results from 
individual
+///    sub-directory iterators are merged into the single FileInfoGenerator 
stream.
+///
+/// The implementation makes use of additional attributes in 
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are 
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+  using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+  /// Helper class, which wraps the producer instance and ensures
+  /// that it stays "open" (i.e. `producer.is_closed() == false`)
+  /// until all discovery iterators are exhausted.
+  ///
+  /// The calss maintains a shared `State`, which wraps the original producer
+  /// and automatically calls `Close()` on it once the ref-count for the
+  /// state reaches zero (which is equivalent to finishing the file discovery
+  /// process).
+  class AutoClosingProducer {

Review Comment:
   `PushGenerator` is thread-safe, but unfortunately `AutoClosingProducer` is 
not.
   
   It seems you can vastly simplify this and make it thread-safe by letting C++ 
do the work:
   ```c++
   struct DiscoveryState {
     FileInfoGeneratorProducer producer;
   
     ~DiscoveryState() {
       producer->Close();
     }
   };
   ```
   ... then pass the same `std::shared_ptr<DiscoveryState>` to every 
`DiscoveryImplIterator`:
   ```c++
     static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
         FileSelector selector, LocalFileSystemOptions fs_opts) {
       PushGenerator<FileInfoGenerator> file_gen;
   
       ARROW_ASSIGN_OR_RAISE(
           auto base_dir, 
arrow::internal::PlatformFilename::FromString(selector.base_dir));
       auto discovery_state = 
std::make_shared<DiscoveryState>(std::move(file_gen.producer()));
       ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0, 
std::move(selector),
                                       std::move(discovery_state),
                                       fs_opts.file_info_batch_size));
   
       return file_gen;
     }
   ```
   



##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -34,10 +34,24 @@ namespace fs {
 
 /// Options for the LocalFileSystem implementation.
 struct ARROW_EXPORT LocalFileSystemOptions {
+  static constexpr int32_t kDefaultDirectoryReadahead = 1;
+  static constexpr int32_t kDefaultFileInfoBatchSize = 1000;
+
   /// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
   /// or a regular one.
   bool use_mmap = false;
 
+  /// Options related to `GetFileSystemGenerator` interface.
+
+  /// How many directories should be processed in parallel
+  /// by the `GetFileSystemGenerator` impl.
+  int32_t directory_readahead = kDefaultDirectoryReadahead;
+  /// Specifies how much entries shall be aggregated into
+  /// a single FileInfoVector chunk by the `GetFileSystemGenerator` impl, which

Review Comment:
   ```suggestion
     /// a single FileInfoVector chunk by the `GetFileInfoGenerator` impl, which
   ```



##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -34,10 +34,24 @@ namespace fs {
 
 /// Options for the LocalFileSystem implementation.
 struct ARROW_EXPORT LocalFileSystemOptions {
+  static constexpr int32_t kDefaultDirectoryReadahead = 1;
+  static constexpr int32_t kDefaultFileInfoBatchSize = 1000;
+
   /// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
   /// or a regular one.
   bool use_mmap = false;
 
+  /// Options related to `GetFileSystemGenerator` interface.
+
+  /// How many directories should be processed in parallel
+  /// by the `GetFileSystemGenerator` impl.

Review Comment:
   ```suggestion
     /// by the `GetFileInfoGenerator` impl.
   ```



##########
cpp/src/arrow/filesystem/CMakeLists.txt:
##########
@@ -28,6 +28,23 @@ add_arrow_test(filesystem-test
                EXTRA_LABELS
                filesystem)
 
+if(ARROW_BUILD_BENCHMARKS AND ARROW_PARQUET)
+    add_arrow_benchmark(localfs_benchmark
+                        PREFIX
+                        "arrow-filesystem"
+                        SOURCES
+                        localfs_benchmark.cc
+                        STATIC_LINK_LIBS
+                        ${ARROW_BENCHMARK_LINK_LIBS}
+                        Boost::filesystem
+                        Boost::system)

Review Comment:
   Looks like the Boost dependency isn't needed?



##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,240 @@ Result<std::vector<FileInfo>> 
LocalFileSystem::GetFileInfo(const FileSelector& s
   return results;
 }
 
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual 
directories
+///    so that each directory results are yielded as a separate 
`FileInfoGenerator` via
+///    an underlying `DiscoveryImplIterator`, which delivers items in chunks 
(default size
+///    is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+///    `DiscoverDirectoryFiles`, with the difference that the results from 
individual
+///    sub-directory iterators are merged into the single FileInfoGenerator 
stream.
+///
+/// The implementation makes use of additional attributes in 
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are 
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+  using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+  /// Helper class, which wraps the producer instance and ensures
+  /// that it stays "open" (i.e. `producer.is_closed() == false`)
+  /// until all discovery iterators are exhausted.
+  ///
+  /// The calss maintains a shared `State`, which wraps the original producer
+  /// and automatically calls `Close()` on it once the ref-count for the
+  /// state reaches zero (which is equivalent to finishing the file discovery
+  /// process).
+  class AutoClosingProducer {
+   public:
+    explicit AutoClosingProducer(FileInfoGeneratorProducer producer)
+        : state_(std::make_shared<State>(std::move(producer))) {}
+
+    bool Push(arrow::Result<FileInfoGenerator> value) {
+      return state_ && state_->Push(std::move(value));
+    }
+
+    void Finish() { state_.reset(); }
+
+    bool IsFinished() { return !state_; }
+
+   private:
+    struct State {
+      State(FileInfoGeneratorProducer producer) : wrapped(std::move(producer)) 
{}
+
+      ~State() { wrapped.Close(); }
+
+      bool Push(arrow::Result<FileInfoGenerator> value) {
+        return wrapped.Push(std::move(value));
+      }
+
+      FileInfoGeneratorProducer wrapped;
+    };
+
+    std::shared_ptr<State> state_;
+  };
+
+  /// The main procedure to start async streaming discovery using a given 
`FileSelector`.
+  ///
+  /// The result is a two-level generator, i.e. "generator of 
FileInfoGenerator:s",
+  /// where each individual generator represents an FileInfo item stream from 
coming an
+  /// individual sub-directory under the selector's `base_dir`.
+  static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+      FileSelector selector, LocalFileSystemOptions fs_opts) {
+    PushGenerator<FileInfoGenerator> file_gen;
+
+    ARROW_ASSIGN_OR_RAISE(
+        auto base_dir, 
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+    ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0, 
std::move(selector),
+                                    AutoClosingProducer(file_gen.producer()),
+                                    fs_opts.file_info_batch_size));
+
+    return file_gen;
+  }
+
+  /// Version of `DiscoverDirectoryFiles` which flattens the stream of 
generators
+  /// into a single FileInfoGenerator stream.
+  /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine 
how much
+  /// readahead should happen.
+  static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+      FileSelector selector, LocalFileSystemOptions fs_opts) {
+    int32_t dir_readahead = fs_opts.directory_readahead;
+    ARROW_ASSIGN_OR_RAISE(
+        auto part_gen, DiscoverDirectoryFiles(std::move(selector), 
std::move(fs_opts)));
+    return dir_readahead > 1
+               ? MakeSequencedMergedGenerator(std::move(part_gen), 
dir_readahead)
+               : MakeConcatenatedGenerator(std::move(part_gen));
+  }
+
+ private:
+  /// The class, which implements iterator interface to traverse a given
+  /// directory at the fixed nesting depth, and possibly recurses into
+  /// sub-directories (if specified by the selector), spawning more
+  /// `DiscoveryImplIterators`, which feed their data into a single producer.
+  class DiscoveryImplIterator {
+    const PlatformFilename dir_fn_;
+    const int32_t nesting_depth_;
+    const FileSelector selector_;
+    const uint32_t file_info_batch_size_;
+
+    AutoClosingProducer file_gen_producer_;
+    FileInfoVector current_chunk_;
+    std::vector<PlatformFilename> child_fns_;
+    size_t idx_ = 0;
+    bool initialized_ = false;
+
+   public:
+    DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+                          FileSelector selector, AutoClosingProducer 
file_gen_producer,
+                          uint32_t file_info_batch_size)
+        : dir_fn_(std::move(dir_fn)),
+          nesting_depth_(nesting_depth),
+          selector_(std::move(selector)),
+          file_info_batch_size_(file_info_batch_size),
+          file_gen_producer_(std::move(file_gen_producer)) {}
+
+    /// Pre-initialize the iterator by listing directory contents and caching
+    /// in the current instance.
+    Status Initialize() {
+      auto result = arrow::internal::ListDir(dir_fn_);
+      if (!result.ok()) {
+        auto status = result.status();
+        if (selector_.allow_not_found && status.IsIOError()) {
+          ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn_));
+          if (!exists) {
+            return Status::OK();
+          }
+        }
+        return status;
+      }
+      child_fns_ = result.MoveValueUnsafe();
+
+      const size_t dirent_count = child_fns_.size();
+      current_chunk_.reserve(dirent_count >= file_info_batch_size_ ? 
file_info_batch_size_
+                                                                   : 
dirent_count);
+
+      initialized_ = true;
+      return Status::OK();
+    }
+
+    Result<FileInfoVector> Next() {
+      if (!initialized_) {
+        auto init = Initialize();
+        if (!init.ok()) {
+          return Finish(init);
+        }
+      }
+      while (idx_ < child_fns_.size()) {
+        auto full_fn = dir_fn_.Join(child_fns_[idx_++]);
+        auto res = StatFile(full_fn.ToNative());
+        if (!res.ok()) {
+          return Finish(res.status());
+        }
+
+        auto info = res.MoveValueUnsafe();
+
+        // Try to recurse into subdirectories, if needed.
+        if (info.type() == FileType::Directory &&
+            nesting_depth_ < selector_.max_recursion && selector_.recursive) {
+          auto status = DoDiscovery(std::move(full_fn), nesting_depth_ + 1, 
selector_,
+                                    file_gen_producer_, file_info_batch_size_);
+          if (!status.ok()) {
+            return Finish(status);
+          }
+        }
+        // Everything is ok. Add the item to the current chunk of data.
+        current_chunk_.emplace_back(std::move(info));
+        // Keep `current_chunk_` as large, as `batch_size_`.
+        // Otherwise, yield the complete chunk to the caller.
+        if (current_chunk_.size() == file_info_batch_size_) {
+          FileInfoVector yield_vec;
+          std::swap(yield_vec, current_chunk_);

Review Comment:
   Can probably be shortened:
   ```suggestion
             auto yield_vec = std::move(current_chunk_);
   ```
   



##########
cpp/src/arrow/filesystem/localfs_benchmark.cc:
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/status.h>
+#include <arrow/testing/random.h>
+#include <arrow/util/async_generator.h>
+#include <arrow/util/string_view.h>
+#include <memory>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/filesystem/localfs.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/formatting.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
+
+#include "parquet/arrow/writer.h"
+
+namespace arrow {
+
+namespace fs {
+
+using arrow::internal::make_unique;
+using arrow::internal::TemporaryDir;
+
+/// Set up hierarchical directory structure to test asynchronous
+/// file discovery interface (GetFileInfoGenerator()) in the LocalFileSystem
+/// class.
+///
+/// The main routine of the class is `InitializeDatasetStructure()`, which
+/// does the following:
+/// 1. Create `num_files_` trivial parquet files under specified root 
directory.
+/// 2. Create `num_dirs_` additional sub-directories in the current dir.
+/// 3. Check if the specified recursion limit is reached (controlled by 
`nesting_depth_`).
+///   a. Return if recursion limit reached.
+///   b. Recurse into each sub-directory and perform steps above, increasing 
current
+///      nesting level.
+///
+/// Parquet files creation is handled by `MakeTrivialParquetFile` function,
+/// which instantiates a trivial schema ({"val", int64()}) and inserts one
+/// random row, then writes the file to storage.
+class LocalFSFixture : public benchmark::Fixture {
+ public:
+  void SetUp(const benchmark::State& state) override {
+    ASSERT_OK_AND_ASSIGN(tmp_dir_, TemporaryDir::Make("localfs-test-"));
+
+    auto options = LocalFileSystemOptions::Defaults();
+    fs_ = make_unique<LocalFileSystem>(options);
+
+    InitializeDatasetStructure(0, tmp_dir_->path());
+  }
+
+  void InitializeDatasetStructure(size_t cur_nesting_level,
+                                  arrow::internal::PlatformFilename 
cur_root_dir) {
+    ASSERT_OK(arrow::internal::CreateDir(cur_root_dir));
+
+    arrow::internal::StringFormatter<Int32Type> format;
+    for (size_t i = 0; i < num_files_; ++i) {
+      std::string fname = "file_";
+      format(i, [&fname](util::string_view formatted) {
+        fname.append(formatted.data(), formatted.size());
+      });
+      fname.append(".parquet");
+      ASSERT_OK_AND_ASSIGN(auto path, cur_root_dir.Join(std::move(fname)));
+      ASSERT_OK(MakeTrivialParquetFile(path.ToString()));
+    }
+
+    if (cur_nesting_level == nesting_depth_) {
+      return;
+    }
+
+    for (size_t i = 0; i < num_dirs_; ++i) {
+      std::string dirname = "dir_";
+      format(i, [&dirname](util::string_view formatted) {
+        dirname.append(formatted.data(), formatted.size());
+      });
+      ASSERT_OK_AND_ASSIGN(auto path, cur_root_dir.Join(std::move(dirname)));
+      InitializeDatasetStructure(cur_nesting_level + 1, std::move(path));
+    }
+  }
+
+  Status MakeTrivialParquetFile(const std::string& path) {
+    FieldVector fields{field("val", int64())};
+    auto batch = random::GenerateBatch(fields, 1 /*num_rows*/, 0);
+    ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch}));
+
+    std::shared_ptr<io::OutputStream> sink;
+    ARROW_ASSIGN_OR_RAISE(sink, fs_->OpenOutputStream(path));
+
+    RETURN_NOT_OK(parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), sink,
+                                             1 /*num_rows*/));
+
+    return Status::OK();
+  }
+
+ protected:
+  std::unique_ptr<TemporaryDir> tmp_dir_;
+  std::unique_ptr<LocalFileSystem> fs_;
+
+  const size_t nesting_depth_ = 2;
+  const size_t num_dirs_ = 10;
+  const size_t num_files_ = 10000;
+};
+
+/// Benchmark for `LocalFileSystem::GetFileInfoGenerator()` performance.
+///
+/// The test function is executed for each combination (cartesian product)
+/// of input arguments tuple (directory_readahead, file_info_batch_size)
+/// to test both internal parallelism and batching.
+BENCHMARK_DEFINE_F(LocalFSFixture, AsyncFileDiscovery)
+(benchmark::State& st) {
+  for (auto _ : st) {
+    // Instantiate LocalFileSystem with custom options for directory readahead
+    // and file info batch size.
+    auto options = LocalFileSystemOptions::Defaults();
+    options.directory_readahead = static_cast<int32_t>(st.range(0));
+    options.file_info_batch_size = static_cast<int32_t>(st.range(1));
+    auto test_fs = make_unique<LocalFileSystem>(options);
+    // Create recursive FileSelector pointing to the root of the temporary
+    // directory, which was set up by the fixture earlier.
+    FileSelector select;
+    select.base_dir = tmp_dir_->path().ToString();
+    select.recursive = true;
+    auto file_gen = test_fs->GetFileInfoGenerator(std::move(select));
+    size_t total_file_count = 0;
+    // Trigger fetching from the generator and count all received FileInfo:s.
+    auto visit_fut =
+        VisitAsyncGenerator(file_gen, [&total_file_count](const 
FileInfoVector& fv) {
+          total_file_count += fv.size();
+          return Status::OK();
+        });
+    ASSERT_FINISHES_OK(visit_fut);
+    st.SetItemsProcessed(total_file_count);
+  }
+}
+BENCHMARK_REGISTER_F(LocalFSFixture, AsyncFileDiscovery)
+    ->ArgNames({"directory_readahead", "file_info_batch_size"})
+    ->ArgsProduct({{1, 2, 4, 8, 16}, {1, 10, 100, 1000}})

Review Comment:
   Can we perhaps cut down on the number of generated benchmarks? For example:
   ```suggestion
       ->ArgsProduct({{1, 4, 16}, {100, 1000}})
   ```
   
   (`1` for `file_info_batch_size` seems so obviously pessimal that it needn't 
be tested, what do you think?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to