westonpace commented on a change in pull request #11189:
URL: https://github.com/apache/arrow/pull/11189#discussion_r712604377



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+  explicit ConcatenatedVectorGenerator(
+      AsyncGenerator<util::optional<std::vector<T>>> inner)
+      : state_(std::make_shared<State>(std::move(inner))) {}
+
+  Future<T> operator()() { return state_->Pull(); }
+
+ private:
+  struct State : public std::enable_shared_from_this<State> {
+    explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+        : inner(std::move(inner)) {}
+
+    Future<T> Pull() {
+      auto guard = mutex.Lock();
+      if (finished) {
+        if (!error.ok()) {
+          return std::move(error);
+        }
+        return IterationTraits<T>::End();
+      }
+      if (available.empty()) {
+        auto waiting_future = Future<T>::Make();
+        waiting.push_back(waiting_future);
+
+        if (!waiting_for_results) {
+          waiting_for_results = true;
+          auto fut = inner();
+          // If fut is already complete, this may end up running the callback 
immediately,
+          // so unlock first
+          guard.Unlock();
+          auto self = this->shared_from_this();
+          fut.AddCallback(
+              [self, fut](const Result<util::optional<std::vector<T>>>&) 
mutable {
+                auto result = fut.MoveResult();
+                self->ProcessResult(std::move(result));

Review comment:
       ```suggestion
                   self->ProcessResult(fut.MoveResult());
   ```
   Can you combine these two lines?  Also, is the `fut` capture just to avoid 
the result copy?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -489,9 +489,13 @@ Result<RecordBatchGenerator> 
ParquetFileFormat::ScanBatchesAsync(
         auto parquet_scan_options,
         GetFragmentScanOptions<ParquetFragmentScanOptions>(
             kParquetTypeName, options.get(), default_fragment_scan_options));
-    ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
-                                              reader, row_groups, 
column_projection,
-                                              
::arrow::internal::GetCpuThreadPool()));
+    // Assume 1 row group corresponds to 1 batch (this factor could be
+    // improved by looking at metadata)
+    int row_group_readahead = options->batch_readahead;
+    ARROW_ASSIGN_OR_RAISE(
+        auto generator, reader->GetRecordBatchGenerator(
+                            reader, row_groups, column_projection,
+                            ::arrow::internal::GetCpuThreadPool(), 
row_group_readahead));
     return MakeReadaheadGenerator(std::move(generator), 
options->batch_readahead);

Review comment:
       Is this redundant at this point since you apply the row group readahead 
in the file?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+  explicit ConcatenatedVectorGenerator(
+      AsyncGenerator<util::optional<std::vector<T>>> inner)
+      : state_(std::make_shared<State>(std::move(inner))) {}
+
+  Future<T> operator()() { return state_->Pull(); }
+
+ private:
+  struct State : public std::enable_shared_from_this<State> {
+    explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+        : inner(std::move(inner)) {}
+
+    Future<T> Pull() {
+      auto guard = mutex.Lock();
+      if (finished) {
+        if (!error.ok()) {
+          return std::move(error);
+        }
+        return IterationTraits<T>::End();
+      }
+      if (available.empty()) {
+        auto waiting_future = Future<T>::Make();
+        waiting.push_back(waiting_future);
+
+        if (!waiting_for_results) {
+          waiting_for_results = true;
+          auto fut = inner();
+          // If fut is already complete, this may end up running the callback 
immediately,
+          // so unlock first
+          guard.Unlock();
+          auto self = this->shared_from_this();
+          fut.AddCallback(
+              [self, fut](const Result<util::optional<std::vector<T>>>&) 
mutable {
+                auto result = fut.MoveResult();
+                self->ProcessResult(std::move(result));
+              });
+        }
+        return waiting_future;
+      }
+      assert(waiting.size() == 0);
+      auto result = Future<T>::MakeFinished(available.front());
+      available.pop_front();
+      return result;
+    }
+
+    void Purge() {
+      while (!waiting.empty()) {
+        waiting.front().MarkFinished(IterationTraits<T>::End());
+        waiting.pop_front();
+      }
+    }
+
+    void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+      auto lock = mutex.Lock();
+      waiting_for_results = false;
+      // We should never call MarkFinished while holding the lock as a
+      // callback may call back into this generator
+      if (!maybe_result.ok()) {
+        finished = true;
+        if (waiting.empty()) {
+          error = maybe_result.status();
+          lock.Unlock();
+        } else {
+          lock.Unlock();
+          waiting.front().MarkFinished(maybe_result.status());
+          waiting.pop_front();
+        }
+        Purge();
+        return;
+      }
+
+      util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+      if (!result.has_value()) {
+        finished = true;
+        lock.Unlock();
+        Purge();
+        return;
+      }
+
+      // MarkFinished might run a callback that tries to pull from us,
+      // so just record what needs finishing, and complete the futures
+      // later
+      std::vector<T> delivered = std::move(*result);
+      std::vector<Future<T>> deliver_to;
+      auto it = delivered.begin();
+      while (it != delivered.end() && !waiting.empty()) {
+        deliver_to.push_back(waiting.front());
+        waiting.pop_front();
+        it++;
+      }
+      available.insert(available.end(), std::make_move_iterator(it),
+                       std::make_move_iterator(delivered.end()));
+
+      // If there's still more waiting futures, poll the source again
+      Future<util::optional<std::vector<T>>> fut;
+      waiting_for_results = !waiting.empty();
+      if (waiting_for_results) {
+        fut = inner();
+      }
+
+      lock.Unlock();
+      // Now actually MarkFinished
+      auto next = delivered.begin();
+      for (auto& fut : deliver_to) {
+        fut.MarkFinished(std::move(*next));
+        next++;
+      }
+
+      if (!fut.is_valid()) return;
+      // Must do this outside the lock since if fut is already complete, this 
callback
+      // will run right away
+      auto self = this->shared_from_this();
+      fut.AddCallback([self, fut](const 
Result<util::optional<std::vector<T>>>&) mutable {
+        auto result = fut.MoveResult();
+        self->ProcessResult(std::move(result));
+      });
+    }
+
+    AsyncGenerator<util::optional<std::vector<T>>> inner;
+    util::Mutex mutex;
+    std::deque<Future<T>> waiting;
+    std::deque<T> available;
+    Status error;
+    // Are we finished polling the source?
+    bool finished = false;
+    // Do we have a request in flight for the source?
+    bool waiting_for_results = false;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream 
of T.
+///
+/// This generator is async-reentrant but will never pull from source 
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.

Review comment:
       Is this much different than 
`MakeConcatenatedGenerator(MakeMappedGenerator(generator_of_vectors, 
MakeVectorGenerator))?`

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+  explicit ConcatenatedVectorGenerator(
+      AsyncGenerator<util::optional<std::vector<T>>> inner)
+      : state_(std::make_shared<State>(std::move(inner))) {}
+
+  Future<T> operator()() { return state_->Pull(); }
+
+ private:
+  struct State : public std::enable_shared_from_this<State> {
+    explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+        : inner(std::move(inner)) {}
+
+    Future<T> Pull() {
+      auto guard = mutex.Lock();
+      if (finished) {
+        if (!error.ok()) {
+          return std::move(error);
+        }
+        return IterationTraits<T>::End();
+      }
+      if (available.empty()) {
+        auto waiting_future = Future<T>::Make();
+        waiting.push_back(waiting_future);
+
+        if (!waiting_for_results) {
+          waiting_for_results = true;
+          auto fut = inner();
+          // If fut is already complete, this may end up running the callback 
immediately,
+          // so unlock first
+          guard.Unlock();
+          auto self = this->shared_from_this();
+          fut.AddCallback(
+              [self, fut](const Result<util::optional<std::vector<T>>>&) 
mutable {
+                auto result = fut.MoveResult();
+                self->ProcessResult(std::move(result));
+              });
+        }
+        return waiting_future;
+      }
+      assert(waiting.size() == 0);
+      auto result = Future<T>::MakeFinished(available.front());
+      available.pop_front();
+      return result;
+    }
+
+    void Purge() {
+      while (!waiting.empty()) {
+        waiting.front().MarkFinished(IterationTraits<T>::End());
+        waiting.pop_front();
+      }
+    }
+
+    void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+      auto lock = mutex.Lock();
+      waiting_for_results = false;
+      // We should never call MarkFinished while holding the lock as a
+      // callback may call back into this generator
+      if (!maybe_result.ok()) {
+        finished = true;
+        if (waiting.empty()) {

Review comment:
       No harm in having this case but I'm not sure if `waiting` can be empty 
here.

##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -461,6 +461,68 @@ TEST(TestAsyncUtil, Concatenated) {
   AssertAsyncGeneratorMatch(expected, concat);
 }
 
+TEST(TestAsyncUtil, ConcatenatedVector) {

Review comment:
       Given the locks you have it would be good to have at least one parallel 
test using a readahead generator.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+  explicit ConcatenatedVectorGenerator(
+      AsyncGenerator<util::optional<std::vector<T>>> inner)
+      : state_(std::make_shared<State>(std::move(inner))) {}
+
+  Future<T> operator()() { return state_->Pull(); }
+
+ private:
+  struct State : public std::enable_shared_from_this<State> {
+    explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+        : inner(std::move(inner)) {}
+
+    Future<T> Pull() {
+      auto guard = mutex.Lock();
+      if (finished) {
+        if (!error.ok()) {
+          return std::move(error);
+        }
+        return IterationTraits<T>::End();
+      }
+      if (available.empty()) {
+        auto waiting_future = Future<T>::Make();
+        waiting.push_back(waiting_future);
+
+        if (!waiting_for_results) {
+          waiting_for_results = true;
+          auto fut = inner();
+          // If fut is already complete, this may end up running the callback 
immediately,
+          // so unlock first
+          guard.Unlock();
+          auto self = this->shared_from_this();
+          fut.AddCallback(
+              [self, fut](const Result<util::optional<std::vector<T>>>&) 
mutable {
+                auto result = fut.MoveResult();
+                self->ProcessResult(std::move(result));
+              });
+        }
+        return waiting_future;
+      }
+      assert(waiting.size() == 0);
+      auto result = Future<T>::MakeFinished(available.front());
+      available.pop_front();
+      return result;
+    }
+
+    void Purge() {
+      while (!waiting.empty()) {
+        waiting.front().MarkFinished(IterationTraits<T>::End());
+        waiting.pop_front();
+      }
+    }
+
+    void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+      auto lock = mutex.Lock();
+      waiting_for_results = false;
+      // We should never call MarkFinished while holding the lock as a
+      // callback may call back into this generator
+      if (!maybe_result.ok()) {
+        finished = true;
+        if (waiting.empty()) {
+          error = maybe_result.status();
+          lock.Unlock();
+        } else {
+          lock.Unlock();
+          waiting.front().MarkFinished(maybe_result.status());
+          waiting.pop_front();
+        }
+        Purge();
+        return;
+      }
+
+      util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+      if (!result.has_value()) {
+        finished = true;
+        lock.Unlock();
+        Purge();
+        return;
+      }
+
+      // MarkFinished might run a callback that tries to pull from us,
+      // so just record what needs finishing, and complete the futures
+      // later
+      std::vector<T> delivered = std::move(*result);
+      std::vector<Future<T>> deliver_to;
+      auto it = delivered.begin();
+      while (it != delivered.end() && !waiting.empty()) {
+        deliver_to.push_back(waiting.front());
+        waiting.pop_front();
+        it++;
+      }
+      available.insert(available.end(), std::make_move_iterator(it),
+                       std::make_move_iterator(delivered.end()));
+
+      // If there's still more waiting futures, poll the source again
+      Future<util::optional<std::vector<T>>> fut;
+      waiting_for_results = !waiting.empty();
+      if (waiting_for_results) {
+        fut = inner();
+      }
+
+      lock.Unlock();
+      // Now actually MarkFinished
+      auto next = delivered.begin();
+      for (auto& fut : deliver_to) {
+        fut.MarkFinished(std::move(*next));
+        next++;
+      }
+
+      if (!fut.is_valid()) return;
+      // Must do this outside the lock since if fut is already complete, this 
callback
+      // will run right away
+      auto self = this->shared_from_this();
+      fut.AddCallback([self, fut](const 
Result<util::optional<std::vector<T>>>&) mutable {
+        auto result = fut.MoveResult();
+        self->ProcessResult(std::move(result));
+      });
+    }
+
+    AsyncGenerator<util::optional<std::vector<T>>> inner;
+    util::Mutex mutex;
+    std::deque<Future<T>> waiting;
+    std::deque<T> available;
+    Status error;
+    // Are we finished polling the source?
+    bool finished = false;
+    // Do we have a request in flight for the source?
+    bool waiting_for_results = false;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream 
of T.
+///
+/// This generator is async-reentrant but will never pull from source 
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.
+template <typename T>
+AsyncGenerator<T> MakeConcatenatedVectorGenerator(

Review comment:
       Maybe `MakeFlattenGenerator` to match `MakeFlattenIterator`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to