westonpace commented on a change in pull request #9714:
URL: https://github.com/apache/arrow/pull/9714#discussion_r599009667
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,103 @@ class ReadaheadGenerator {
std::queue<Future<T>> readahead_queue_;
};
+/// \brief A generator where the producer pushes items on a queue.
+///
+/// No back-pressure is applied, so this generator is mostly useful when
+/// producing the values is neither CPU- nor memory-expensive (e.g. fetching
+/// filesystem metadata).
+///
+/// This generator is not async-reentrant.
+template <typename T>
+class PushGenerator {
+ struct State {
+ util::Mutex mutex;
+ std::deque<Result<T>> result_q;
+ util::optional<Future<T>> consumer_fut;
+ bool finished = false;
+ };
+
+ public:
+ /// Producer API for PushGenerator
+ class Producer {
+ public:
+ explicit Producer(std::shared_ptr<State> state) : state_(std::move(state))
{}
+
+ /// Push a value on the queue
+ void Push(Result<T> result) {
+ auto lock = state_->mutex.Lock();
+ if (state_->finished) {
+ // Closed early
+ return;
+ }
+ if (state_->consumer_fut.has_value()) {
+ auto fut = std::move(state_->consumer_fut.value());
+ state_->consumer_fut.reset();
+ lock.Unlock(); // unlock before potentially invoking a callback
+ fut.MarkFinished(std::move(result));
+ return;
+ }
+ state_->result_q.push_back(std::move(result));
+ }
+
+ /// \brief Tell the consumer we have finished producing
+ ///
+ /// It is allowed to call this and later call Push() again ("early close").
+ /// In this case, calls to Push() after the queue is closed are silently
+ /// ignored. This can help implementing non-trivial cancellation cases.
+ void Close() {
+ auto lock = state_->mutex.Lock();
+ if (state_->finished) {
+ // Already closed
+ return;
+ }
+ state_->finished = true;
+ if (state_->consumer_fut.has_value()) {
Review comment:
You could potentially clear the result_q here. I could understand
either approach. However, if `Close` is semantically the same as cancel it
would seem you wouldn't want the downstream to keep processing the already
generated results.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -940,4 +942,144 @@ TEST(TestAsyncIteratorTransform, SkipSome) {
AssertAsyncGeneratorMatch({"1", "3"}, std::move(filtered));
}
+TEST(PushGenerator, Empty) {
+ PushGenerator<TestInt> gen;
+ auto producer = gen.producer();
+
+ auto fut = gen();
+ AssertNotFinished(fut);
+ producer.Close();
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+
+ // Close idempotent
+ fut = gen();
+ producer.Close();
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
Review comment:
I feel like this check might be unnecessary? Can't hurt though.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -940,4 +942,144 @@ TEST(TestAsyncIteratorTransform, SkipSome) {
AssertAsyncGeneratorMatch({"1", "3"}, std::move(filtered));
}
+TEST(PushGenerator, Empty) {
+ PushGenerator<TestInt> gen;
+ auto producer = gen.producer();
+
+ auto fut = gen();
+ AssertNotFinished(fut);
+ producer.Close();
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+
+ // Close idempotent
+ fut = gen();
+ producer.Close();
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+}
+
+TEST(PushGenerator, Success) {
+ PushGenerator<TestInt> gen;
+ auto producer = gen.producer();
+ std::vector<Future<TestInt>> futures;
+
+ producer.Push(TestInt{1});
+ producer.Push(TestInt{2});
+ for (int i = 0; i < 3; ++i) {
+ futures.push_back(gen());
+ }
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]);
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{2}, futures[1]);
+ AssertNotFinished(futures[2]);
+
+ producer.Push(TestInt{3});
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{3}, futures[2]);
+ producer.Push(TestInt{4});
+ futures.push_back(gen());
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{4}, futures[3]);
+ producer.Push(TestInt{5});
+ producer.Close();
+ for (int i = 0; i < 4; ++i) {
+ futures.push_back(gen());
+ }
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{5}, futures[4]);
+ for (int i = 5; i < 8; ++i) {
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), futures[i]);
+ }
+ ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
+}
+
+TEST(PushGenerator, Errors) {
+ PushGenerator<TestInt> gen;
+ auto producer = gen.producer();
+ std::vector<Future<TestInt>> futures;
+
+ producer.Push(TestInt{1});
+ producer.Push(Status::Invalid("2"));
+ for (int i = 0; i < 3; ++i) {
+ futures.push_back(gen());
+ }
+ ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]);
+ ASSERT_FINISHES_AND_RAISES(Invalid, futures[1]);
+ AssertNotFinished(futures[2]);
Review comment:
Sorry I didn't answer the earlier question (should an error always
terminate a generator?). This seems to be your test here. I think from the
general async generator concept this would be UB. This possibility is valid.
Terminating early would also be valid. Downstream generators should be written
to expect this as a possibility and should not rely on errors terminating
successive calls automatically.
Which is a long winded way of saying this is valid.
It would also be ok if `futures[2]` was `IterationTraits<TestInt>::End()`
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]