westonpace commented on a change in pull request #11189:
URL: https://github.com/apache/arrow/pull/11189#discussion_r712604377
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+ explicit ConcatenatedVectorGenerator(
+ AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : state_(std::make_shared<State>(std::move(inner))) {}
+
+ Future<T> operator()() { return state_->Pull(); }
+
+ private:
+ struct State : public std::enable_shared_from_this<State> {
+ explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : inner(std::move(inner)) {}
+
+ Future<T> Pull() {
+ auto guard = mutex.Lock();
+ if (finished) {
+ if (!error.ok()) {
+ return std::move(error);
+ }
+ return IterationTraits<T>::End();
+ }
+ if (available.empty()) {
+ auto waiting_future = Future<T>::Make();
+ waiting.push_back(waiting_future);
+
+ if (!waiting_for_results) {
+ waiting_for_results = true;
+ auto fut = inner();
+ // If fut is already complete, this may end up running the callback
immediately,
+ // so unlock first
+ guard.Unlock();
+ auto self = this->shared_from_this();
+ fut.AddCallback(
+ [self, fut](const Result<util::optional<std::vector<T>>>&)
mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
Review comment:
```suggestion
self->ProcessResult(fut.MoveResult());
```
Can you combine these two lines? Also, is the `fut` capture just to avoid
the result copy?
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -489,9 +489,13 @@ Result<RecordBatchGenerator>
ParquetFileFormat::ScanBatchesAsync(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, options.get(), default_fragment_scan_options));
- ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
- reader, row_groups,
column_projection,
-
::arrow::internal::GetCpuThreadPool()));
+ // Assume 1 row group corresponds to 1 batch (this factor could be
+ // improved by looking at metadata)
+ int row_group_readahead = options->batch_readahead;
+ ARROW_ASSIGN_OR_RAISE(
+ auto generator, reader->GetRecordBatchGenerator(
+ reader, row_groups, column_projection,
+ ::arrow::internal::GetCpuThreadPool(),
row_group_readahead));
return MakeReadaheadGenerator(std::move(generator),
options->batch_readahead);
Review comment:
Is this redundant at this point since you apply the row group readahead
in the file?
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+ explicit ConcatenatedVectorGenerator(
+ AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : state_(std::make_shared<State>(std::move(inner))) {}
+
+ Future<T> operator()() { return state_->Pull(); }
+
+ private:
+ struct State : public std::enable_shared_from_this<State> {
+ explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : inner(std::move(inner)) {}
+
+ Future<T> Pull() {
+ auto guard = mutex.Lock();
+ if (finished) {
+ if (!error.ok()) {
+ return std::move(error);
+ }
+ return IterationTraits<T>::End();
+ }
+ if (available.empty()) {
+ auto waiting_future = Future<T>::Make();
+ waiting.push_back(waiting_future);
+
+ if (!waiting_for_results) {
+ waiting_for_results = true;
+ auto fut = inner();
+ // If fut is already complete, this may end up running the callback
immediately,
+ // so unlock first
+ guard.Unlock();
+ auto self = this->shared_from_this();
+ fut.AddCallback(
+ [self, fut](const Result<util::optional<std::vector<T>>>&)
mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+ return waiting_future;
+ }
+ assert(waiting.size() == 0);
+ auto result = Future<T>::MakeFinished(available.front());
+ available.pop_front();
+ return result;
+ }
+
+ void Purge() {
+ while (!waiting.empty()) {
+ waiting.front().MarkFinished(IterationTraits<T>::End());
+ waiting.pop_front();
+ }
+ }
+
+ void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+ auto lock = mutex.Lock();
+ waiting_for_results = false;
+ // We should never call MarkFinished while holding the lock as a
+ // callback may call back into this generator
+ if (!maybe_result.ok()) {
+ finished = true;
+ if (waiting.empty()) {
+ error = maybe_result.status();
+ lock.Unlock();
+ } else {
+ lock.Unlock();
+ waiting.front().MarkFinished(maybe_result.status());
+ waiting.pop_front();
+ }
+ Purge();
+ return;
+ }
+
+ util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+ if (!result.has_value()) {
+ finished = true;
+ lock.Unlock();
+ Purge();
+ return;
+ }
+
+ // MarkFinished might run a callback that tries to pull from us,
+ // so just record what needs finishing, and complete the futures
+ // later
+ std::vector<T> delivered = std::move(*result);
+ std::vector<Future<T>> deliver_to;
+ auto it = delivered.begin();
+ while (it != delivered.end() && !waiting.empty()) {
+ deliver_to.push_back(waiting.front());
+ waiting.pop_front();
+ it++;
+ }
+ available.insert(available.end(), std::make_move_iterator(it),
+ std::make_move_iterator(delivered.end()));
+
+ // If there's still more waiting futures, poll the source again
+ Future<util::optional<std::vector<T>>> fut;
+ waiting_for_results = !waiting.empty();
+ if (waiting_for_results) {
+ fut = inner();
+ }
+
+ lock.Unlock();
+ // Now actually MarkFinished
+ auto next = delivered.begin();
+ for (auto& fut : deliver_to) {
+ fut.MarkFinished(std::move(*next));
+ next++;
+ }
+
+ if (!fut.is_valid()) return;
+ // Must do this outside the lock since if fut is already complete, this
callback
+ // will run right away
+ auto self = this->shared_from_this();
+ fut.AddCallback([self, fut](const
Result<util::optional<std::vector<T>>>&) mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+
+ AsyncGenerator<util::optional<std::vector<T>>> inner;
+ util::Mutex mutex;
+ std::deque<Future<T>> waiting;
+ std::deque<T> available;
+ Status error;
+ // Are we finished polling the source?
+ bool finished = false;
+ // Do we have a request in flight for the source?
+ bool waiting_for_results = false;
+ };
+
+ std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream
of T.
+///
+/// This generator is async-reentrant but will never pull from source
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.
Review comment:
Is this much different than
`MakeConcatenatedGenerator(MakeMappedGenerator(generator_of_vectors,
MakeVectorGenerator))?`
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+ explicit ConcatenatedVectorGenerator(
+ AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : state_(std::make_shared<State>(std::move(inner))) {}
+
+ Future<T> operator()() { return state_->Pull(); }
+
+ private:
+ struct State : public std::enable_shared_from_this<State> {
+ explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : inner(std::move(inner)) {}
+
+ Future<T> Pull() {
+ auto guard = mutex.Lock();
+ if (finished) {
+ if (!error.ok()) {
+ return std::move(error);
+ }
+ return IterationTraits<T>::End();
+ }
+ if (available.empty()) {
+ auto waiting_future = Future<T>::Make();
+ waiting.push_back(waiting_future);
+
+ if (!waiting_for_results) {
+ waiting_for_results = true;
+ auto fut = inner();
+ // If fut is already complete, this may end up running the callback
immediately,
+ // so unlock first
+ guard.Unlock();
+ auto self = this->shared_from_this();
+ fut.AddCallback(
+ [self, fut](const Result<util::optional<std::vector<T>>>&)
mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+ return waiting_future;
+ }
+ assert(waiting.size() == 0);
+ auto result = Future<T>::MakeFinished(available.front());
+ available.pop_front();
+ return result;
+ }
+
+ void Purge() {
+ while (!waiting.empty()) {
+ waiting.front().MarkFinished(IterationTraits<T>::End());
+ waiting.pop_front();
+ }
+ }
+
+ void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+ auto lock = mutex.Lock();
+ waiting_for_results = false;
+ // We should never call MarkFinished while holding the lock as a
+ // callback may call back into this generator
+ if (!maybe_result.ok()) {
+ finished = true;
+ if (waiting.empty()) {
Review comment:
No harm in having this case but I'm not sure if `waiting` can be empty
here.
##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -461,6 +461,68 @@ TEST(TestAsyncUtil, Concatenated) {
AssertAsyncGeneratorMatch(expected, concat);
}
+TEST(TestAsyncUtil, ConcatenatedVector) {
Review comment:
Given the locks you have it would be good to have at least one parallel
test using a readahead generator.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1116,6 +1116,151 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+/// \brief See MakeConcatenatedVectorGenerator
+template <typename T>
+class ConcatenatedVectorGenerator {
+ public:
+ explicit ConcatenatedVectorGenerator(
+ AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : state_(std::make_shared<State>(std::move(inner))) {}
+
+ Future<T> operator()() { return state_->Pull(); }
+
+ private:
+ struct State : public std::enable_shared_from_this<State> {
+ explicit State(AsyncGenerator<util::optional<std::vector<T>>> inner)
+ : inner(std::move(inner)) {}
+
+ Future<T> Pull() {
+ auto guard = mutex.Lock();
+ if (finished) {
+ if (!error.ok()) {
+ return std::move(error);
+ }
+ return IterationTraits<T>::End();
+ }
+ if (available.empty()) {
+ auto waiting_future = Future<T>::Make();
+ waiting.push_back(waiting_future);
+
+ if (!waiting_for_results) {
+ waiting_for_results = true;
+ auto fut = inner();
+ // If fut is already complete, this may end up running the callback
immediately,
+ // so unlock first
+ guard.Unlock();
+ auto self = this->shared_from_this();
+ fut.AddCallback(
+ [self, fut](const Result<util::optional<std::vector<T>>>&)
mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+ return waiting_future;
+ }
+ assert(waiting.size() == 0);
+ auto result = Future<T>::MakeFinished(available.front());
+ available.pop_front();
+ return result;
+ }
+
+ void Purge() {
+ while (!waiting.empty()) {
+ waiting.front().MarkFinished(IterationTraits<T>::End());
+ waiting.pop_front();
+ }
+ }
+
+ void ProcessResult(Result<util::optional<std::vector<T>>> maybe_result) {
+ auto lock = mutex.Lock();
+ waiting_for_results = false;
+ // We should never call MarkFinished while holding the lock as a
+ // callback may call back into this generator
+ if (!maybe_result.ok()) {
+ finished = true;
+ if (waiting.empty()) {
+ error = maybe_result.status();
+ lock.Unlock();
+ } else {
+ lock.Unlock();
+ waiting.front().MarkFinished(maybe_result.status());
+ waiting.pop_front();
+ }
+ Purge();
+ return;
+ }
+
+ util::optional<std::vector<T>> result = maybe_result.MoveValueUnsafe();
+ if (!result.has_value()) {
+ finished = true;
+ lock.Unlock();
+ Purge();
+ return;
+ }
+
+ // MarkFinished might run a callback that tries to pull from us,
+ // so just record what needs finishing, and complete the futures
+ // later
+ std::vector<T> delivered = std::move(*result);
+ std::vector<Future<T>> deliver_to;
+ auto it = delivered.begin();
+ while (it != delivered.end() && !waiting.empty()) {
+ deliver_to.push_back(waiting.front());
+ waiting.pop_front();
+ it++;
+ }
+ available.insert(available.end(), std::make_move_iterator(it),
+ std::make_move_iterator(delivered.end()));
+
+ // If there's still more waiting futures, poll the source again
+ Future<util::optional<std::vector<T>>> fut;
+ waiting_for_results = !waiting.empty();
+ if (waiting_for_results) {
+ fut = inner();
+ }
+
+ lock.Unlock();
+ // Now actually MarkFinished
+ auto next = delivered.begin();
+ for (auto& fut : deliver_to) {
+ fut.MarkFinished(std::move(*next));
+ next++;
+ }
+
+ if (!fut.is_valid()) return;
+ // Must do this outside the lock since if fut is already complete, this
callback
+ // will run right away
+ auto self = this->shared_from_this();
+ fut.AddCallback([self, fut](const
Result<util::optional<std::vector<T>>>&) mutable {
+ auto result = fut.MoveResult();
+ self->ProcessResult(std::move(result));
+ });
+ }
+
+ AsyncGenerator<util::optional<std::vector<T>>> inner;
+ util::Mutex mutex;
+ std::deque<Future<T>> waiting;
+ std::deque<T> available;
+ Status error;
+ // Are we finished polling the source?
+ bool finished = false;
+ // Do we have a request in flight for the source?
+ bool waiting_for_results = false;
+ };
+
+ std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that converts a stream of vector<T> to a stream
of T.
+///
+/// This generator is async-reentrant but will never pull from source
reentrantly.
+/// Instead, combine with MakeReadaheadGenerator.
+template <typename T>
+AsyncGenerator<T> MakeConcatenatedVectorGenerator(
Review comment:
Maybe `MakeFlattenGenerator` to match `MakeFlattenIterator`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]