westonpace commented on a change in pull request #9995:
URL: https://github.com/apache/arrow/pull/9995#discussion_r611990476
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>>
S3FileSystem::GetFileInfo(const FileSelector& sele
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
+
+ auto fut = impl_->ListBucketsAsync(io_context());
+ auto impl = impl_->shared_from_this();
+ fut.AddCallback(
+ [producer, select, impl](const Result<std::vector<std::string>>& res)
mutable {
+ if (!res.ok()) {
+ producer.Push(res.status());
+ producer.Close();
+ return;
+ }
+ FileInfoVector buckets;
+ for (const auto& bucket : *res) {
+ buckets.push_back(FileInfo{bucket, FileType::Directory});
+ }
+ // Generate all bucket infos
+
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));
+ if (select.recursive) {
+ // Generate recursive walk for each bucket in turn
+ for (const auto& bucket : buckets) {
+ producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+ }
+ }
+ producer.Close();
+ });
+
+ return MakeConcatenatedGenerator(
Review comment:
You're right. I forgot that this is disconnected via `PushGenerator`.
You are correct, this will run the buckets in parallel.
As for `max_subscriptions`, it represents how many `AsyncGenerator<T>` we
will process at once. A concrete example might be easiest. In the scanner we
convert each file to read into `AsyncGenerator<RecordBatch>`. These are then
combined into `AsyncGenerator<AsyncGenerator<RecordBatch>>`. However, we don't
want to read all files at the same time. Mostly because there isn't much point
(unless possibly on S3) but also to limit the amount of RAM used (e.g. in CSV
each file is going to get a background reader which will read up to some # of
blocks, if you have 10k files...)
So we only read from `options->fileReadahead` files at once and this becomes
`max_subscriptions`. A `subscription` is an individual "inner"
`AsyncGenerator<RecordBatch>` pulled off the "outer"
`AsyncGenerator<AsyncGenerator<RecordBatch>>` generator.
These terms (`subscription`, `inner`, and `outer`) are all borrowed from
Rx's equivalent
https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap
Although in Rx the parameter is called `concurrent` but that seemed a little
vague.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]