pitrou commented on code in PR #13796:
URL: https://github.com/apache/arrow/pull/13796#discussion_r939956199
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,208 @@ Result<std::vector<FileInfo>>
LocalFileSystem::GetFileInfo(const FileSelector& s
return results;
}
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual
directories
+/// so that each directory results are yielded as a separate
`FileInfoGenerator` via
+/// an underlying `DiscoveryImplIterator`, which delivers items in chunks
(default size
+/// is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+/// `DiscoverDirectoryFiles`, with the difference that the results from
individual
+/// sub-directory iterators are merged into the single FileInfoGenerator
stream.
+///
+/// The implementation makes use of additional attributes in
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+ using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+ /// The main procedure to start async streaming discovery using a given
`FileSelector`.
+ ///
+ /// The result is a two-level generator, i.e. "generator of
FileInfoGenerator:s",
+ /// where each individual generator represents an FileInfo item stream from
coming an
+ /// individual sub-directory under the selector's `base_dir`.
+ static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ PushGenerator<FileInfoGenerator> file_gen;
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto base_dir,
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+ ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0,
std::move(selector),
+ file_gen.producer(), fs_opts.batch_size));
+
+ return file_gen;
+ }
+
+ /// Version of `DiscoverDirectoryFiles` which flattens the stream of
generators
+ /// into a single FileInfoGenerator stream.
+ /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine
how much
+ /// readahead should happen.
+ static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ int32_t dir_readahead = fs_opts.directory_readahead;
+ ARROW_ASSIGN_OR_RAISE(
+ auto part_gen, DiscoverDirectoryFiles(std::move(selector),
std::move(fs_opts)));
+ return dir_readahead > 1
+ ? MakeSequencedMergedGenerator(std::move(part_gen),
dir_readahead)
+ : MakeConcatenatedGenerator(std::move(part_gen));
+ }
+
+ private:
+ /// The class, which implements iterator interface to traverse a given
+ /// directory at the fixed nesting depth, and possibly recurses into
+ /// sub-directories (if specified by the selector), spawning more
+ /// `DiscoveryImplIterators`, which feed their data into a single producer.
+ class DiscoveryImplIterator {
+ const PlatformFilename dir_fn_;
+ const int32_t nesting_depth_;
+ const FileSelector selector_;
+ const uint32_t batch_size_;
+
+ FileInfoGeneratorProducer file_gen_producer_;
+ FileInfoVector current_chunk_;
+ std::vector<PlatformFilename> child_fns_;
+ size_t idx_ = 0;
+ bool initialized_ = false;
+
+ public:
+ DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+ FileSelector selector,
+ FileInfoGeneratorProducer file_gen_producer,
+ uint32_t batch_size)
+ : dir_fn_(std::move(dir_fn)),
+ nesting_depth_(nesting_depth),
+ selector_(std::move(selector)),
+ batch_size_(batch_size),
+ file_gen_producer_(std::move(file_gen_producer)) {}
+
+ /// Pre-initialize the iterator by listing directory contents and caching
+ /// in the current instance.
+ Status Initialize() {
+ auto result = arrow::internal::ListDir(dir_fn_);
+ if (!result.ok()) {
+ auto status = result.status();
+ if (selector_.allow_not_found && status.IsIOError()) {
+ auto exists = FileExists(dir_fn_);
+ if (exists.ok() && !*exists) {
+ return Status::OK();
+ } else {
+ return exists.ok() ? arrow::Status::UnknownError(
Review Comment:
If `exists` is ok you should simply propagate the error not-found error, not
create another one.
This would make the final code look like this probably:
```c++
if (!result.ok()) {
if (selector_.allow_not_found && status.IsIOError()) {
ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn));
if (!exists) {
return Status::OK();
}
}
return status;
}
```
(incidentally, this is a similar snippet as in `StatSelector()`)
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,208 @@ Result<std::vector<FileInfo>>
LocalFileSystem::GetFileInfo(const FileSelector& s
return results;
}
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual
directories
+/// so that each directory results are yielded as a separate
`FileInfoGenerator` via
+/// an underlying `DiscoveryImplIterator`, which delivers items in chunks
(default size
+/// is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+/// `DiscoverDirectoryFiles`, with the difference that the results from
individual
+/// sub-directory iterators are merged into the single FileInfoGenerator
stream.
+///
+/// The implementation makes use of additional attributes in
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+ using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+ /// The main procedure to start async streaming discovery using a given
`FileSelector`.
+ ///
+ /// The result is a two-level generator, i.e. "generator of
FileInfoGenerator:s",
+ /// where each individual generator represents an FileInfo item stream from
coming an
+ /// individual sub-directory under the selector's `base_dir`.
+ static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ PushGenerator<FileInfoGenerator> file_gen;
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto base_dir,
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+ ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0,
std::move(selector),
+ file_gen.producer(), fs_opts.batch_size));
+
+ return file_gen;
+ }
+
+ /// Version of `DiscoverDirectoryFiles` which flattens the stream of
generators
+ /// into a single FileInfoGenerator stream.
+ /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine
how much
+ /// readahead should happen.
+ static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ int32_t dir_readahead = fs_opts.directory_readahead;
+ ARROW_ASSIGN_OR_RAISE(
+ auto part_gen, DiscoverDirectoryFiles(std::move(selector),
std::move(fs_opts)));
+ return dir_readahead > 1
+ ? MakeSequencedMergedGenerator(std::move(part_gen),
dir_readahead)
+ : MakeConcatenatedGenerator(std::move(part_gen));
+ }
+
+ private:
+ /// The class, which implements iterator interface to traverse a given
+ /// directory at the fixed nesting depth, and possibly recurses into
+ /// sub-directories (if specified by the selector), spawning more
+ /// `DiscoveryImplIterators`, which feed their data into a single producer.
+ class DiscoveryImplIterator {
+ const PlatformFilename dir_fn_;
+ const int32_t nesting_depth_;
+ const FileSelector selector_;
+ const uint32_t batch_size_;
+
+ FileInfoGeneratorProducer file_gen_producer_;
+ FileInfoVector current_chunk_;
+ std::vector<PlatformFilename> child_fns_;
+ size_t idx_ = 0;
+ bool initialized_ = false;
+
+ public:
+ DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+ FileSelector selector,
+ FileInfoGeneratorProducer file_gen_producer,
+ uint32_t batch_size)
+ : dir_fn_(std::move(dir_fn)),
+ nesting_depth_(nesting_depth),
+ selector_(std::move(selector)),
+ batch_size_(batch_size),
+ file_gen_producer_(std::move(file_gen_producer)) {}
+
+ /// Pre-initialize the iterator by listing directory contents and caching
+ /// in the current instance.
+ Status Initialize() {
+ auto result = arrow::internal::ListDir(dir_fn_);
+ if (!result.ok()) {
+ auto status = result.status();
+ if (selector_.allow_not_found && status.IsIOError()) {
+ auto exists = FileExists(dir_fn_);
Review Comment:
I think here you can simply propagate the error which will simplify the code
below.
```suggestion
ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn));
```
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,208 @@ Result<std::vector<FileInfo>>
LocalFileSystem::GetFileInfo(const FileSelector& s
return results;
}
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual
directories
+/// so that each directory results are yielded as a separate
`FileInfoGenerator` via
+/// an underlying `DiscoveryImplIterator`, which delivers items in chunks
(default size
+/// is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+/// `DiscoverDirectoryFiles`, with the difference that the results from
individual
+/// sub-directory iterators are merged into the single FileInfoGenerator
stream.
+///
+/// The implementation makes use of additional attributes in
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+ using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+ /// The main procedure to start async streaming discovery using a given
`FileSelector`.
+ ///
+ /// The result is a two-level generator, i.e. "generator of
FileInfoGenerator:s",
+ /// where each individual generator represents an FileInfo item stream from
coming an
+ /// individual sub-directory under the selector's `base_dir`.
+ static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ PushGenerator<FileInfoGenerator> file_gen;
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto base_dir,
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+ ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0,
std::move(selector),
+ file_gen.producer(), fs_opts.batch_size));
+
+ return file_gen;
+ }
+
+ /// Version of `DiscoverDirectoryFiles` which flattens the stream of
generators
+ /// into a single FileInfoGenerator stream.
+ /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine
how much
+ /// readahead should happen.
+ static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ int32_t dir_readahead = fs_opts.directory_readahead;
+ ARROW_ASSIGN_OR_RAISE(
+ auto part_gen, DiscoverDirectoryFiles(std::move(selector),
std::move(fs_opts)));
+ return dir_readahead > 1
+ ? MakeSequencedMergedGenerator(std::move(part_gen),
dir_readahead)
+ : MakeConcatenatedGenerator(std::move(part_gen));
+ }
+
+ private:
+ /// The class, which implements iterator interface to traverse a given
+ /// directory at the fixed nesting depth, and possibly recurses into
+ /// sub-directories (if specified by the selector), spawning more
+ /// `DiscoveryImplIterators`, which feed their data into a single producer.
+ class DiscoveryImplIterator {
+ const PlatformFilename dir_fn_;
+ const int32_t nesting_depth_;
+ const FileSelector selector_;
+ const uint32_t batch_size_;
+
+ FileInfoGeneratorProducer file_gen_producer_;
+ FileInfoVector current_chunk_;
+ std::vector<PlatformFilename> child_fns_;
+ size_t idx_ = 0;
+ bool initialized_ = false;
+
+ public:
+ DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+ FileSelector selector,
+ FileInfoGeneratorProducer file_gen_producer,
+ uint32_t batch_size)
+ : dir_fn_(std::move(dir_fn)),
+ nesting_depth_(nesting_depth),
+ selector_(std::move(selector)),
+ batch_size_(batch_size),
+ file_gen_producer_(std::move(file_gen_producer)) {}
+
+ /// Pre-initialize the iterator by listing directory contents and caching
+ /// in the current instance.
+ Status Initialize() {
+ auto result = arrow::internal::ListDir(dir_fn_);
+ if (!result.ok()) {
+ auto status = result.status();
+ if (selector_.allow_not_found && status.IsIOError()) {
+ auto exists = FileExists(dir_fn_);
+ if (exists.ok() && !*exists) {
+ return Status::OK();
+ } else {
+ return exists.ok() ? arrow::Status::UnknownError(
+ "Failed to discover directory: ",
dir_fn_.ToString())
+ : exists.status();
+ }
+ }
+ return status;
+ }
+ child_fns_ = result.MoveValueUnsafe();
+
+ const size_t dirent_count = child_fns_.size();
+ current_chunk_.reserve(dirent_count >= batch_size_ ? batch_size_ :
dirent_count);
+
+ initialized_ = true;
+ return Status::OK();
+ }
+
+ Result<FileInfoVector> Next() {
+ if (!initialized_) {
+ auto init = Initialize();
+ if (!init.ok()) {
+ return Finish(init);
+ }
+ }
+ while (idx_ < child_fns_.size()) {
+ auto full_fn = dir_fn_.Join(child_fns_[idx_++]);
+ auto res = StatFile(full_fn.ToNative());
+ if (!res.ok()) {
+ return Finish(res.status());
+ }
+
+ auto info = res.MoveValueUnsafe();
+
+ // Try to recurse into subdirectories, if needed.
+ if (info.type() == FileType::Directory &&
+ nesting_depth_ < selector_.max_recursion && selector_.recursive) {
+ auto status = DoDiscovery(std::move(full_fn), nesting_depth_ + 1,
selector_,
+ file_gen_producer_, batch_size_);
+ if (!status.ok()) {
+ return Finish(status);
+ }
+ }
+ // Everything is ok. Add the item to the current chunk of data.
+ current_chunk_.emplace_back(std::move(info));
+ // Keep `current_chunk_` as large, as `batch_size_`.
+ // Otherwise, yield the complete chunk to the caller.
+ if (current_chunk_.size() == batch_size_) {
+ FileInfoVector yield_vec;
+ std::swap(yield_vec, current_chunk_);
+ const size_t items_left = child_fns_.size() - idx_;
+ current_chunk_.reserve(items_left >= batch_size_ ? batch_size_ :
items_left);
+ return yield_vec;
+ }
+ } // while (idx_ < child_fns_.size())
+
+ // Flush out remaining items
+ if (!current_chunk_.empty()) {
+ FileInfoVector yield_vec;
+ std::swap(yield_vec, current_chunk_);
+ return yield_vec;
+ }
+ return Finish();
+ }
+
+ private:
+ /// Close the producer end of stream and return iteration end marker.
+ Result<FileInfoVector> Finish(Status status = Status::OK()) {
+ file_gen_producer_.Close();
+ ARROW_RETURN_NOT_OK(status);
+ return IterationEnd<FileInfoVector>();
+ }
+ };
+
+ /// Create an instance of `DiscoveryImplIterator` under the hood for the
+ /// specified directory, wrap it in the `BackgroundGenerator +
TransferredGenerator`
+ /// bundle and feed the results to the main producer queue.
+ static Status DoDiscovery(const PlatformFilename& dir_fn, int32_t
nesting_depth,
+ FileSelector selector,
+ FileInfoGeneratorProducer file_gen_producer,
+ uint32_t batch_size) {
+ ARROW_RETURN_IF(file_gen_producer.is_closed(),
+ arrow::Status::Cancelled("Discovery cancelled"));
Review Comment:
When does this occur?
##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -34,10 +34,24 @@ namespace fs {
/// Options for the LocalFileSystem implementation.
struct ARROW_EXPORT LocalFileSystemOptions {
+ static constexpr uint32_t kDefaultDirectoryReadahead = 1u;
+ static constexpr uint32_t kDefaultBatchSize = 1000u;
Review Comment:
Style nit: let's avoid gratuitous use of unsigned integers. Can make these
`int` or `int32_t`.
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -309,6 +314,208 @@ Result<std::vector<FileInfo>>
LocalFileSystem::GetFileInfo(const FileSelector& s
return results;
}
+namespace {
+
+/// Workhorse for streaming async implementation of `GetFileInfo`
+/// (`GetFileInfoGenerator`).
+///
+/// There are two variants of async discovery functions suported:
+/// 1. `DiscoverDirectoryFiles`, which parallelizes traversal of individual
directories
+/// so that each directory results are yielded as a separate
`FileInfoGenerator` via
+/// an underlying `DiscoveryImplIterator`, which delivers items in chunks
(default size
+/// is 1K items).
+/// 2. `DiscoverDirectoriesFlattened`, which forwards execution to the
+/// `DiscoverDirectoryFiles`, with the difference that the results from
individual
+/// sub-directory iterators are merged into the single FileInfoGenerator
stream.
+///
+/// The implementation makes use of additional attributes in
`LocalFileSystemOptions`,
+/// such as `directory_readahead`, which can be used to tune algorithm
+/// behavior and adjust how many directories can be processed in parallel.
+/// This option is disabled by default, so that individual directories are
processed
+/// in serial manner via `MakeConcatenatedGenerator` under the hood.
+class AsyncStatSelector {
+ public:
+ using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
+
+ /// The main procedure to start async streaming discovery using a given
`FileSelector`.
+ ///
+ /// The result is a two-level generator, i.e. "generator of
FileInfoGenerator:s",
+ /// where each individual generator represents an FileInfo item stream from
coming an
+ /// individual sub-directory under the selector's `base_dir`.
+ static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ PushGenerator<FileInfoGenerator> file_gen;
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto base_dir,
arrow::internal::PlatformFilename::FromString(selector.base_dir));
+ ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0,
std::move(selector),
+ file_gen.producer(), fs_opts.batch_size));
+
+ return file_gen;
+ }
+
+ /// Version of `DiscoverDirectoryFiles` which flattens the stream of
generators
+ /// into a single FileInfoGenerator stream.
+ /// Makes use of `LocalFileSystemOptions::directory_readahead` to determine
how much
+ /// readahead should happen.
+ static arrow::Result<FileInfoGenerator> DiscoverDirectoriesFlattened(
+ FileSelector selector, LocalFileSystemOptions fs_opts) {
+ int32_t dir_readahead = fs_opts.directory_readahead;
+ ARROW_ASSIGN_OR_RAISE(
+ auto part_gen, DiscoverDirectoryFiles(std::move(selector),
std::move(fs_opts)));
+ return dir_readahead > 1
+ ? MakeSequencedMergedGenerator(std::move(part_gen),
dir_readahead)
+ : MakeConcatenatedGenerator(std::move(part_gen));
+ }
+
+ private:
+ /// The class, which implements iterator interface to traverse a given
+ /// directory at the fixed nesting depth, and possibly recurses into
+ /// sub-directories (if specified by the selector), spawning more
+ /// `DiscoveryImplIterators`, which feed their data into a single producer.
+ class DiscoveryImplIterator {
+ const PlatformFilename dir_fn_;
+ const int32_t nesting_depth_;
+ const FileSelector selector_;
+ const uint32_t batch_size_;
+
+ FileInfoGeneratorProducer file_gen_producer_;
+ FileInfoVector current_chunk_;
+ std::vector<PlatformFilename> child_fns_;
+ size_t idx_ = 0;
+ bool initialized_ = false;
+
+ public:
+ DiscoveryImplIterator(PlatformFilename dir_fn, int32_t nesting_depth,
+ FileSelector selector,
+ FileInfoGeneratorProducer file_gen_producer,
+ uint32_t batch_size)
+ : dir_fn_(std::move(dir_fn)),
+ nesting_depth_(nesting_depth),
+ selector_(std::move(selector)),
+ batch_size_(batch_size),
+ file_gen_producer_(std::move(file_gen_producer)) {}
+
+ /// Pre-initialize the iterator by listing directory contents and caching
+ /// in the current instance.
+ Status Initialize() {
+ auto result = arrow::internal::ListDir(dir_fn_);
+ if (!result.ok()) {
+ auto status = result.status();
+ if (selector_.allow_not_found && status.IsIOError()) {
+ auto exists = FileExists(dir_fn_);
+ if (exists.ok() && !*exists) {
+ return Status::OK();
+ } else {
+ return exists.ok() ? arrow::Status::UnknownError(
+ "Failed to discover directory: ",
dir_fn_.ToString())
+ : exists.status();
+ }
+ }
+ return status;
+ }
+ child_fns_ = result.MoveValueUnsafe();
+
+ const size_t dirent_count = child_fns_.size();
+ current_chunk_.reserve(dirent_count >= batch_size_ ? batch_size_ :
dirent_count);
+
+ initialized_ = true;
+ return Status::OK();
+ }
+
+ Result<FileInfoVector> Next() {
+ if (!initialized_) {
+ auto init = Initialize();
+ if (!init.ok()) {
+ return Finish(init);
+ }
+ }
+ while (idx_ < child_fns_.size()) {
+ auto full_fn = dir_fn_.Join(child_fns_[idx_++]);
+ auto res = StatFile(full_fn.ToNative());
+ if (!res.ok()) {
+ return Finish(res.status());
+ }
+
+ auto info = res.MoveValueUnsafe();
+
+ // Try to recurse into subdirectories, if needed.
+ if (info.type() == FileType::Directory &&
+ nesting_depth_ < selector_.max_recursion && selector_.recursive) {
+ auto status = DoDiscovery(std::move(full_fn), nesting_depth_ + 1,
selector_,
+ file_gen_producer_, batch_size_);
+ if (!status.ok()) {
+ return Finish(status);
+ }
+ }
+ // Everything is ok. Add the item to the current chunk of data.
+ current_chunk_.emplace_back(std::move(info));
+ // Keep `current_chunk_` as large, as `batch_size_`.
+ // Otherwise, yield the complete chunk to the caller.
+ if (current_chunk_.size() == batch_size_) {
+ FileInfoVector yield_vec;
+ std::swap(yield_vec, current_chunk_);
+ const size_t items_left = child_fns_.size() - idx_;
+ current_chunk_.reserve(items_left >= batch_size_ ? batch_size_ :
items_left);
+ return yield_vec;
+ }
+ } // while (idx_ < child_fns_.size())
+
+ // Flush out remaining items
+ if (!current_chunk_.empty()) {
+ FileInfoVector yield_vec;
+ std::swap(yield_vec, current_chunk_);
+ return yield_vec;
Review Comment:
Why not simply:
```suggestion
return std::move(current_chunk_);
```
##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -34,10 +34,24 @@ namespace fs {
/// Options for the LocalFileSystem implementation.
struct ARROW_EXPORT LocalFileSystemOptions {
+ static constexpr uint32_t kDefaultDirectoryReadahead = 1u;
+ static constexpr uint32_t kDefaultBatchSize = 1000u;
+
/// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
/// or a regular one.
bool use_mmap = false;
+ /// Options related to `GetFileSystemGenerator` interface.
+
+ /// How many directories should be processed in parallel
+ /// by the `GetFileSystemGenerator` impl.
+ uint32_t directory_readahead = kDefaultDirectoryReadahead;
+ /// Specifies how much entries shall be aggregated into
+ /// a single FileInfoVector chunk by the `GetFileSystemGenerator` impl, which
+ /// is the result of `stat`:ing individual dirents, obtained by the call to
+ /// `internal::ListDir`.
+ uint32_t batch_size = kDefaultBatchSize;
Review Comment:
Rename this to `file_info_batch_size` for clarity?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]