westonpace commented on a change in pull request #11189: URL: https://github.com/apache/arrow/pull/11189#discussion_r712604377
########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1116,6 +1116,151 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so return MergedGenerator<T>(std::move(source), 1); } +/// \brief See MakeConcatenatedVectorGenerator +template <typename T> +class ConcatenatedVectorGenerator { + public: + explicit ConcatenatedVectorGenerator( + AsyncGenerator<util::optional<std::vector<T>>> inner) + : state_(std::make_shared<State>(std::move(inner))) {} + + Future<T> operator()() { return state_->Pull(); } + + private: + struct State : public std::enable_shared_from_this<State> { + explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner) + : inner(std::move(inner)) {} + + Future<T> Pull() { + auto guard = mutex.Lock(); + if (finished) { + if (!error.ok()) { + return std::move(error); + } + return IterationTraits<T>::End(); + } + if (available.empty()) { + auto waiting_future = Future<T>::Make(); + waiting.push_back(waiting_future); + + if (!waiting_for_results) { + waiting_for_results = true; + auto fut = inner(); + // If fut is already complete, this may end up running the callback immediately, + // so unlock first + guard.Unlock(); + auto self = this->shared_from_this(); + fut.AddCallback( + [self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); Review comment: ```suggestion self->ProcessResult(fut.MoveResult()); ``` Can you combine these two lines? Also, is the `fut` capture just to avoid the result copy? ########## File path: cpp/src/arrow/dataset/file_parquet.cc ########## @@ -489,9 +489,13 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync( auto parquet_scan_options, GetFragmentScanOptions<ParquetFragmentScanOptions>( kParquetTypeName, options.get(), default_fragment_scan_options)); - ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator( - reader, row_groups, column_projection, - ::arrow::internal::GetCpuThreadPool())); + // Assume 1 row group corresponds to 1 batch (this factor could be + // improved by looking at metadata) + int row_group_readahead = options->batch_readahead; + ARROW_ASSIGN_OR_RAISE( + auto generator, reader->GetRecordBatchGenerator( + reader, row_groups, column_projection, + ::arrow::internal::GetCpuThreadPool(), row_group_readahead)); return MakeReadaheadGenerator(std::move(generator), options->batch_readahead); Review comment: Is this redundant at this point since you apply the row group readahead in the file? ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1116,6 +1116,151 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so return MergedGenerator<T>(std::move(source), 1); } +/// \brief See MakeConcatenatedVectorGenerator +template <typename T> +class ConcatenatedVectorGenerator { + public: + explicit ConcatenatedVectorGenerator( + AsyncGenerator<util::optional<std::vector<T>>> inner) + : state_(std::make_shared<State>(std::move(inner))) {} + + Future<T> operator()() { return state_->Pull(); } + + private: + struct State : public std::enable_shared_from_this<State> { + explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner) + : inner(std::move(inner)) {} + + Future<T> Pull() { + auto guard = mutex.Lock(); + if (finished) { + if (!error.ok()) { + return std::move(error); + } + return IterationTraits<T>::End(); + } + if (available.empty()) { + auto waiting_future = Future<T>::Make(); + waiting.push_back(waiting_future); + + if (!waiting_for_results) { + waiting_for_results = true; + auto fut = inner(); + // If fut is already complete, this may end up running the callback immediately, + // so unlock first + guard.Unlock(); + auto self = this->shared_from_this(); + fut.AddCallback( + [self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); + }); + } + return waiting_future; + } + assert(waiting.size() == 0); + auto result = Future<T>::MakeFinished(available.front()); + available.pop_front(); + return result; + } + + void Purge() { + while (!waiting.empty()) { + waiting.front().MarkFinished(IterationTraits<T>::End()); + waiting.pop_front(); + } + } + + void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) { + auto lock = mutex.Lock(); + waiting_for_results = false; + // We should never call MarkFinished while holding the lock as a + // callback may call back into this generator + if (!maybe_result.ok()) { + finished = true; + if (waiting.empty()) { + error = maybe_result.status(); + lock.Unlock(); + } else { + lock.Unlock(); + waiting.front().MarkFinished(maybe_result.status()); + waiting.pop_front(); + } + Purge(); + return; + } + + util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe(); + if (!result.has_value()) { + finished = true; + lock.Unlock(); + Purge(); + return; + } + + // MarkFinished might run a callback that tries to pull from us, + // so just record what needs finishing, and complete the futures + // later + std::vector<T> delivered = std::move(*result); + std::vector<Future<T>> deliver_to; + auto it = delivered.begin(); + while (it != delivered.end() && !waiting.empty()) { + deliver_to.push_back(waiting.front()); + waiting.pop_front(); + it++; + } + available.insert(available.end(), std::make_move_iterator(it), + std::make_move_iterator(delivered.end())); + + // If there's still more waiting futures, poll the source again + Future<util::optional<std::vector<T>>> fut; + waiting_for_results = !waiting.empty(); + if (waiting_for_results) { + fut = inner(); + } + + lock.Unlock(); + // Now actually MarkFinished + auto next = delivered.begin(); + for (auto& fut : deliver_to) { + fut.MarkFinished(std::move(*next)); + next++; + } + + if (!fut.is_valid()) return; + // Must do this outside the lock since if fut is already complete, this callback + // will run right away + auto self = this->shared_from_this(); + fut.AddCallback([self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); + }); + } + + AsyncGenerator<util::optional<std::vector<T>>> inner; + util::Mutex mutex; + std::deque<Future<T>> waiting; + std::deque<T> available; + Status error; + // Are we finished polling the source? + bool finished = false; + // Do we have a request in flight for the source? + bool waiting_for_results = false; + }; + + std::shared_ptr<State> state_; +}; + +/// \brief Creates a generator that converts a stream of vector<T> to a stream of T. +/// +/// This generator is async-reentrant but will never pull from source reentrantly. +/// Instead, combine with MakeReadaheadGenerator. Review comment: Is this much different than `MakeConcatenatedGenerator(MakeMappedGenerator(generator_of_vectors, MakeVectorGenerator))?` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1116,6 +1116,151 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so return MergedGenerator<T>(std::move(source), 1); } +/// \brief See MakeConcatenatedVectorGenerator +template <typename T> +class ConcatenatedVectorGenerator { + public: + explicit ConcatenatedVectorGenerator( + AsyncGenerator<util::optional<std::vector<T>>> inner) + : state_(std::make_shared<State>(std::move(inner))) {} + + Future<T> operator()() { return state_->Pull(); } + + private: + struct State : public std::enable_shared_from_this<State> { + explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner) + : inner(std::move(inner)) {} + + Future<T> Pull() { + auto guard = mutex.Lock(); + if (finished) { + if (!error.ok()) { + return std::move(error); + } + return IterationTraits<T>::End(); + } + if (available.empty()) { + auto waiting_future = Future<T>::Make(); + waiting.push_back(waiting_future); + + if (!waiting_for_results) { + waiting_for_results = true; + auto fut = inner(); + // If fut is already complete, this may end up running the callback immediately, + // so unlock first + guard.Unlock(); + auto self = this->shared_from_this(); + fut.AddCallback( + [self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); + }); + } + return waiting_future; + } + assert(waiting.size() == 0); + auto result = Future<T>::MakeFinished(available.front()); + available.pop_front(); + return result; + } + + void Purge() { + while (!waiting.empty()) { + waiting.front().MarkFinished(IterationTraits<T>::End()); + waiting.pop_front(); + } + } + + void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) { + auto lock = mutex.Lock(); + waiting_for_results = false; + // We should never call MarkFinished while holding the lock as a + // callback may call back into this generator + if (!maybe_result.ok()) { + finished = true; + if (waiting.empty()) { Review comment: No harm in having this case but I'm not sure if `waiting` can be empty here. ########## File path: cpp/src/arrow/util/async_generator_test.cc ########## @@ -461,6 +461,68 @@ TEST(TestAsyncUtil, Concatenated) { AssertAsyncGeneratorMatch(expected, concat); } +TEST(TestAsyncUtil, ConcatenatedVector) { Review comment: Given the locks you have it would be good to have at least one parallel test using a readahead generator. ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1116,6 +1116,151 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so return MergedGenerator<T>(std::move(source), 1); } +/// \brief See MakeConcatenatedVectorGenerator +template <typename T> +class ConcatenatedVectorGenerator { + public: + explicit ConcatenatedVectorGenerator( + AsyncGenerator<util::optional<std::vector<T>>> inner) + : state_(std::make_shared<State>(std::move(inner))) {} + + Future<T> operator()() { return state_->Pull(); } + + private: + struct State : public std::enable_shared_from_this<State> { + explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner) + : inner(std::move(inner)) {} + + Future<T> Pull() { + auto guard = mutex.Lock(); + if (finished) { + if (!error.ok()) { + return std::move(error); + } + return IterationTraits<T>::End(); + } + if (available.empty()) { + auto waiting_future = Future<T>::Make(); + waiting.push_back(waiting_future); + + if (!waiting_for_results) { + waiting_for_results = true; + auto fut = inner(); + // If fut is already complete, this may end up running the callback immediately, + // so unlock first + guard.Unlock(); + auto self = this->shared_from_this(); + fut.AddCallback( + [self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); + }); + } + return waiting_future; + } + assert(waiting.size() == 0); + auto result = Future<T>::MakeFinished(available.front()); + available.pop_front(); + return result; + } + + void Purge() { + while (!waiting.empty()) { + waiting.front().MarkFinished(IterationTraits<T>::End()); + waiting.pop_front(); + } + } + + void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) { + auto lock = mutex.Lock(); + waiting_for_results = false; + // We should never call MarkFinished while holding the lock as a + // callback may call back into this generator + if (!maybe_result.ok()) { + finished = true; + if (waiting.empty()) { + error = maybe_result.status(); + lock.Unlock(); + } else { + lock.Unlock(); + waiting.front().MarkFinished(maybe_result.status()); + waiting.pop_front(); + } + Purge(); + return; + } + + util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe(); + if (!result.has_value()) { + finished = true; + lock.Unlock(); + Purge(); + return; + } + + // MarkFinished might run a callback that tries to pull from us, + // so just record what needs finishing, and complete the futures + // later + std::vector<T> delivered = std::move(*result); + std::vector<Future<T>> deliver_to; + auto it = delivered.begin(); + while (it != delivered.end() && !waiting.empty()) { + deliver_to.push_back(waiting.front()); + waiting.pop_front(); + it++; + } + available.insert(available.end(), std::make_move_iterator(it), + std::make_move_iterator(delivered.end())); + + // If there's still more waiting futures, poll the source again + Future<util::optional<std::vector<T>>> fut; + waiting_for_results = !waiting.empty(); + if (waiting_for_results) { + fut = inner(); + } + + lock.Unlock(); + // Now actually MarkFinished + auto next = delivered.begin(); + for (auto& fut : deliver_to) { + fut.MarkFinished(std::move(*next)); + next++; + } + + if (!fut.is_valid()) return; + // Must do this outside the lock since if fut is already complete, this callback + // will run right away + auto self = this->shared_from_this(); + fut.AddCallback([self, fut](const Result<util::optional<std::vector<T>>>&) mutable { + auto result = fut.MoveResult(); + self->ProcessResult(std::move(result)); + }); + } + + AsyncGenerator<util::optional<std::vector<T>>> inner; + util::Mutex mutex; + std::deque<Future<T>> waiting; + std::deque<T> available; + Status error; + // Are we finished polling the source? + bool finished = false; + // Do we have a request in flight for the source? + bool waiting_for_results = false; + }; + + std::shared_ptr<State> state_; +}; + +/// \brief Creates a generator that converts a stream of vector<T> to a stream of T. +/// +/// This generator is async-reentrant but will never pull from source reentrantly. +/// Instead, combine with MakeReadaheadGenerator. +template <typename T> +AsyncGenerator<T> MakeConcatenatedVectorGenerator( Review comment: Maybe `MakeFlattenGenerator` to match `MakeFlattenIterator`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org