westonpace commented on a change in pull request #9995:
URL: https://github.com/apache/arrow/pull/9995#discussion_r611990476



##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>> 
S3FileSystem::GetFileInfo(const FileSelector& sele
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();
+
+    auto fut = impl_->ListBucketsAsync(io_context());
+    auto impl = impl_->shared_from_this();
+    fut.AddCallback(
+        [producer, select, impl](const Result<std::vector<std::string>>& res) 
mutable {
+          if (!res.ok()) {
+            producer.Push(res.status());
+            producer.Close();
+            return;
+          }
+          FileInfoVector buckets;
+          for (const auto& bucket : *res) {
+            buckets.push_back(FileInfo{bucket, FileType::Directory});
+          }
+          // Generate all bucket infos
+          
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));
+          if (select.recursive) {
+            // Generate recursive walk for each bucket in turn
+            for (const auto& bucket : buckets) {
+              producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+            }
+          }
+          producer.Close();
+        });
+
+    return MakeConcatenatedGenerator(

Review comment:
       You're right.  I forgot that this is disconnected via `PushGenerator`.  
You are correct, this will run the buckets in parallel.
   
   As for `max_subscriptions`, it represents how many `AsyncGenerator<T>` we 
will process at once.  A concrete example might be easiest.  In the scanner we 
convert each file to read into `AsyncGenerator<RecordBatch>`.  These are then 
combined into `AsyncGenerator<AsyncGenerator<RecordBatch>>`.  However, we don't 
want to read all files at the same time.  Mostly because there isn't much point 
(unless possibly on S3) but also to limit the amount of RAM used (e.g. in CSV 
each file is going to get a background reader which will read up to some # of 
blocks, if you have 10k files...)
   
   So we only read from `options->fileReadahead` files at once and this becomes 
`max_subscriptions`.  A `subscription` is an individual "inner" 
`AsyncGenerator<RecordBatch>` pulled off the "outer" 
`AsyncGenerator<AsyncGenerator<RecordBatch>>` generator.
   
   These terms (`subscription`, `inner`, and `outer`) are all borrowed from 
Rx's equivalent 
https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap
   
   Although in Rx the parameter is called `concurrent` but that seemed a little 
vague.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to