lidavidm commented on a change in pull request #11189:
URL: https://github.com/apache/arrow/pull/11189#discussion_r713334535



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+  explicit ConcatenatedVectorGenerator(
+      AsyncGenerator<util::optional<std::vector<T>>> inner)
+      : state_(std::make_shared<State>(std::move(inner))) {}
+
+  Future<T> operator()() { return state_->Pull(); }
+
+ private:
+  struct State : public std::enable_shared_from_this<State> {
+    explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+        : inner(std::move(inner)) {}
+
+    Future<T> Pull() {
+      auto guard = mutex.Lock();
+      if (finished) {
+        if (!error.ok()) {
+          return std::move(error);
+        }
+        return IterationTraits<T>::End();
+      }
+      if (available.empty()) {
+        auto waiting_future = Future<T>::Make();
+        waiting.push_back(waiting_future);
+
+        if (!waiting_for_results) {
+          waiting_for_results = true;
+          auto fut = inner();
+          // If fut is already complete, this may end up running the callback 
immediately,
+          // so unlock first
+          guard.Unlock();
+          auto self = this->shared_from_this();
+          fut.AddCallback(
+              [self, fut](const Result<util::optional<std::vector<T>>>&) 
mutable {
+                auto result = fut.MoveResult();
+                self->ProcessResult(std::move(result));
+              });
+        }
+        return waiting_future;
+      }
+      assert(waiting.size() == 0);
+      auto result = Future<T>::MakeFinished(available.front());
+      available.pop_front();
+      return result;
+    }
+
+    void Purge() {
+      while (!waiting.empty()) {
+        waiting.front().MarkFinished(IterationTraits<T>::End());
+        waiting.pop_front();
+      }
+    }
+
+    void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+      auto lock = mutex.Lock();
+      waiting_for_results = false;
+      // We should never call MarkFinished while holding the lock as a
+      // callback may call back into this generator
+      if (!maybe_result.ok()) {
+        finished = true;
+        if (waiting.empty()) {
+          error = maybe_result.status();
+          lock.Unlock();
+        } else {
+          lock.Unlock();
+          waiting.front().MarkFinished(maybe_result.status());
+          waiting.pop_front();
+        }
+        Purge();
+        return;
+      }
+
+      util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+      if (!result.has_value()) {
+        finished = true;
+        lock.Unlock();
+        Purge();
+        return;
+      }
+
+      // MarkFinished might run a callback that tries to pull from us,
+      // so just record what needs finishing, and complete the futures
+      // later
+      std::vector<T> delivered = std::move(*result);
+      std::vector<Future<T>> deliver_to;
+      auto it = delivered.begin();
+      while (it != delivered.end() && !waiting.empty()) {
+        deliver_to.push_back(waiting.front());
+        waiting.pop_front();
+        it++;
+      }
+      available.insert(available.end(), std::make_move_iterator(it),
+                       std::make_move_iterator(delivered.end()));
+
+      // If there's still more waiting futures, poll the source again
+      Future<util::optional<std::vector<T>>> fut;
+      waiting_for_results = !waiting.empty();
+      if (waiting_for_results) {
+        fut = inner();
+      }
+
+      lock.Unlock();
+      // Now actually MarkFinished
+      auto next = delivered.begin();
+      for (auto& fut : deliver_to) {
+        fut.MarkFinished(std::move(*next));
+        next++;
+      }
+
+      if (!fut.is_valid()) return;
+      // Must do this outside the lock since if fut is already complete, this 
callback
+      // will run right away
+      auto self = this->shared_from_this();
+      fut.AddCallback([self, fut](const 
Result<util::optional<std::vector<T>>>&) mutable {
+        auto result = fut.MoveResult();
+        self->ProcessResult(std::move(result));
+      });
+    }
+
+    AsyncGenerator<util::optional<std::vector<T>>> inner;
+    util::Mutex mutex;
+    std::deque<Future<T>> waiting;
+    std::deque<T> available;
+    Status error;
+    // Are we finished polling the source?
+    bool finished = false;
+    // Do we have a request in flight for the source?
+    bool waiting_for_results = false;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream 
of T.
+///
+/// This generator is async-reentrant but will never pull from source 
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.

Review comment:
       Alright, it seems to be about the same, so I went with that solution - 
saves us both work! (This is effectively what you commented in JIRA: we just 
implement batch readahead as row group readahead.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to