lidavidm commented on a change in pull request #11189:
URL: https://github.com/apache/arrow/pull/11189#discussion_r712967234
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+ explicit ConcatenatedVectorGenerator(
+ AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : state_(std::make_shared<State>(std::move(inner))) {}
+
+ Future<T> operator()() { return state_->Pull(); }
+
+ private:
+ struct State : public std::enable_shared_from_this<State> {
+ explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : inner(std::move(inner)) {}
+
+ Future<T> Pull() {
+ auto guard = mutex.Lock();
+ if (finished) {
+ if (!error.ok()) {
+ return std::move(error);
+ }
+ return IterationTraits<T>::End();
+ }
+ if (available.empty()) {
+ auto waiting_future = Future<T>::Make();
+ waiting.push_back(waiting_future);
+
+ if (!waiting_for_results) {
+ waiting_for_results = true;
+ auto fut = inner();
+ // If fut is already complete, this may end up running the callback
immediately,
+ // so unlock first
+ guard.Unlock();
+ auto self = this->shared_from_this();
+ fut.AddCallback(
+ [self, fut](const Result<util::optional<std::vector<T>>>&)
mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+ return waiting_future;
+ }
+ assert(waiting.size() == 0);
+ auto result = Future<T>::MakeFinished(available.front());
+ available.pop_front();
+ return result;
+ }
+
+ void Purge() {
+ while (!waiting.empty()) {
+ waiting.front().MarkFinished(IterationTraits<T>::End());
+ waiting.pop_front();
+ }
+ }
+
+ void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+ auto lock = mutex.Lock();
+ waiting_for_results = false;
+ // We should never call MarkFinished while holding the lock as a
+ // callback may call back into this generator
+ if (!maybe_result.ok()) {
+ finished = true;
+ if (waiting.empty()) {
+ error = maybe_result.status();
+ lock.Unlock();
+ } else {
+ lock.Unlock();
+ waiting.front().MarkFinished(maybe_result.status());
+ waiting.pop_front();
+ }
+ Purge();
+ return;
+ }
+
+ util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+ if (!result.has_value()) {
+ finished = true;
+ lock.Unlock();
+ Purge();
+ return;
+ }
+
+ // MarkFinished might run a callback that tries to pull from us,
+ // so just record what needs finishing, and complete the futures
+ // later
+ std::vector<T> delivered = std::move(*result);
+ std::vector<Future<T>> deliver_to;
+ auto it = delivered.begin();
+ while (it != delivered.end() && !waiting.empty()) {
+ deliver_to.push_back(waiting.front());
+ waiting.pop_front();
+ it++;
+ }
+ available.insert(available.end(), std::make_move_iterator(it),
+ std::make_move_iterator(delivered.end()));
+
+ // If there's still more waiting futures, poll the source again
+ Future<util::optional<std::vector<T>>> fut;
+ waiting_for_results = !waiting.empty();
+ if (waiting_for_results) {
+ fut = inner();
+ }
+
+ lock.Unlock();
+ // Now actually MarkFinished
+ auto next = delivered.begin();
+ for (auto& fut : deliver_to) {
+ fut.MarkFinished(std::move(*next));
+ next++;
+ }
+
+ if (!fut.is_valid()) return;
+ // Must do this outside the lock since if fut is already complete, this
callback
+ // will run right away
+ auto self = this->shared_from_this();
+ fut.AddCallback([self, fut](const
Result<util::optional<std::vector<T>>>&) mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+
+ AsyncGenerator<util::optional<std::vector<T>>> inner;
+ util::Mutex mutex;
+ std::deque<Future<T>> waiting;
+ std::deque<T> available;
+ Status error;
+ // Are we finished polling the source?
+ bool finished = false;
+ // Do we have a request in flight for the source?
+ bool waiting_for_results = false;
+ };
+
+ std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream
of T.
+///
+/// This generator is async-reentrant but will never pull from source
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.
Review comment:
Actually, now that you point that out…I suppose there's one layer of
indirection that this gets rid of (once the next vector arrives, any pending
futures will get fulfilled right away, instead of going through the callbacks.
But I'll check if this actually affects performance/make sure we get
parallelism, if not, we should just use the existing ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]