westonpace commented on a change in pull request #9995:
URL: https://github.com/apache/arrow/pull/9995#discussion_r611712317



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> 
it, int readahead_queue_si
   return MakeGeneratorIterator(std::move(owned_bg_generator));
 }
 
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {

Review comment:
       The Rx name for this is `of` but that's pretty peculiar and wouldn't fit 
here.  Also, we haven't been following their naming scheme too closely anyways 
since their names tend to be pretty unintuitive.  I think this name is pretty 
clear.  So this is just an FYI.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> 
it, int readahead_queue_si
   return MakeGeneratorIterator(std::move(owned_bg_generator));
 }
 
+/// \brief Make a generator that returns a single pre-generated future

Review comment:
       Can you add a note that this generator is async-reentrant.  I hope no 
one would ever intentionally add readahead to this but I could see a situation 
where a generator is maybe created from a known (cached) value and maybe has to 
be fetched from a source that could use readahead.  Also, it just helps with 
the consistency of the `MakeXyz` functions.

##########
File path: cpp/src/arrow/filesystem/filesystem.h
##########
@@ -141,6 +142,19 @@ struct ARROW_EXPORT FileLocator {
   std::string path;
 };
 
+using FileInfoVector = std::vector<FileInfo>;
+using FileInfoGenerator = std::function<Future<FileInfoVector>()>;
+
+}  // namespace fs
+
+template <>
+struct IterationTraits<fs::FileInfoVector> {

Review comment:
       Should we just bite the bullet and add `struct 
IterationTraits<std::vector<T>>`?

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>> 
S3FileSystem::GetFileInfo(const FileSelector& sele
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();
+
+    auto fut = impl_->ListBucketsAsync(io_context());
+    auto impl = impl_->shared_from_this();
+    fut.AddCallback(
+        [producer, select, impl](const Result<std::vector<std::string>>& res) 
mutable {
+          if (!res.ok()) {
+            producer.Push(res.status());
+            producer.Close();
+            return;
+          }
+          FileInfoVector buckets;
+          for (const auto& bucket : *res) {
+            buckets.push_back(FileInfo{bucket, FileType::Directory});
+          }
+          // Generate all bucket infos
+          
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));
+          if (select.recursive) {
+            // Generate recursive walk for each bucket in turn
+            for (const auto& bucket : buckets) {
+              producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+            }
+          }
+          producer.Close();
+        });
+
+    return MakeConcatenatedGenerator(

Review comment:
       Why concatenated instead of merged?  Concatenated will run one bucket at 
a time while merged can run multiple.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> 
it, int readahead_queue_si
   return MakeGeneratorIterator(std::move(owned_bg_generator));
 }
 
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+  assert(future.is_valid());
+  struct Generator {
+    Future<T> future;
+
+    Future<T> operator()() {
+      if (future.is_valid()) {
+        return std::move(future);
+      } else {
+        return IterationEnd<T>();

Review comment:
       I've been using `AsyncGeneratorEnd`.  I could probably consider merging 
`AsyncGeneratorEnd` into `IterationEnd`.  It ties back to the more general 
question about whether we want to support the implicit conversion from `T` to 
`Future<T>` via `Future<T>::MakeFinished`.
   
   I think when I asked earlier there was some preference to keep explicitly 
using `Future<T>::MakeFinished` when possible to be explicit but with the 
addition of the conversion constructor I've noticed a few spots where that has 
been missed.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>> 
S3FileSystem::GetFileInfo(const FileSelector& sele
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();

Review comment:
       Maybe make a future JIRA to abandon the walk if all references to `gen` 
are lost?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> 
it, int readahead_queue_si
   return MakeGeneratorIterator(std::move(owned_bg_generator));
 }
 
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+  assert(future.is_valid());
+  struct Generator {
+    Future<T> future;
+
+    Future<T> operator()() {
+      if (future.is_valid()) {
+        return std::move(future);
+      } else {
+        return IterationEnd<T>();
+      }
+    }
+  };
+  return Generator{std::move(future)};
+}
+
+/// \brief Make a generator that always fails with a given error
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(Status st) {

Review comment:
       Another FYI, this would be called `throw` in Rx.  We could name it 
`raise` but `raise` doesn't really combine with `Make` and I think consistency 
with `Make` would be nicer here.  No change recommended.
   
   Also, I've been tempted to create ARROW-12341 for a bit now.  It will use 
this.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>> 
S3FileSystem::GetFileInfo(const FileSelector& sele
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();
+
+    auto fut = impl_->ListBucketsAsync(io_context());
+    auto impl = impl_->shared_from_this();
+    fut.AddCallback(
+        [producer, select, impl](const Result<std::vector<std::string>>& res) 
mutable {
+          if (!res.ok()) {
+            producer.Push(res.status());

Review comment:
       Does `PushGenerator` not auto-close on a failing status?  If not can we 
add a `Close(Status)`?

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1126,6 +1127,11 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
 
   template <typename... Args>
   static Status Walk(Args&&... args) {
+    return WalkAsync(std::forward<Args>(args)...).status();

Review comment:
       This pattern got me in trouble with nested deadlock.  Imagine the thread 
pool has 8 threads and we are calling `Walk` 10 times.  One potential 
workaround is to feed this through the new `RunSerially` which will avoid 
nested deadlock but could impact performance (although should be ok here).
   
   Actually, I believe you are ok here.  All tasks spawned by this call will 
run on the I/O executor and do not transfer back.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>> 
S3FileSystem::GetFileInfo(const FileSelector& sele
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();
+
+    auto fut = impl_->ListBucketsAsync(io_context());
+    auto impl = impl_->shared_from_this();
+    fut.AddCallback(
+        [producer, select, impl](const Result<std::vector<std::string>>& res) 
mutable {
+          if (!res.ok()) {
+            producer.Push(res.status());
+            producer.Close();
+            return;
+          }
+          FileInfoVector buckets;
+          for (const auto& bucket : *res) {
+            buckets.push_back(FileInfo{bucket, FileType::Directory});
+          }
+          // Generate all bucket infos
+          
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));

Review comment:
       Why not use `MakeSingleFutureGenerator`?

##########
File path: cpp/src/arrow/filesystem/test_util.cc
##########
@@ -111,6 +113,12 @@ void SortInfos(std::vector<FileInfo>* infos) {
   std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
 }
 
+void AssertFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos) 
{

Review comment:
       Naming nit: Perhaps don't name this `Assert` since it isn't actually 
asserting the contents but just collecting them?  I see 
ASSERT_FINISHES_OK_AND_ASSIGN as an analogue of ASSERT_OK_AND_ASSIGN which is 
more of a utility assertion and less of a test expectation. 

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -159,10 +161,11 @@ Future<std::vector<FileInfo>> 
FileSystem::GetFileInfoAsync(
       [paths](std::shared_ptr<FileSystem> self) { return 
self->GetFileInfo(paths); });
 }
 
-Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(const FileSelector& 
select) {
-  return FileSystemDefer(
+FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) 
{

Review comment:
       I'm not sure if this is right.  I would have expected 
`AsyncGenerator<FileInfo>` and not `AsyncGenerator<FileInfoVector>`.  The 
latter is still going to block until all results are available.  Is this meant 
to be a step towards the former?
   
   Nevermind.  I looked at the S3 generator now and I think I see.  You are 
generating multiple batches of `FileInfo`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to