pitrou commented on a change in pull request #9714:
URL: https://github.com/apache/arrow/pull/9714#discussion_r599022185
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,103 @@ class ReadaheadGenerator {
std::queue<Future<T>> readahead_queue_;
};
+/// \brief A generator where the producer pushes items on a queue.
+///
+/// No back-pressure is applied, so this generator is mostly useful when
+/// producing the values is neither CPU- nor memory-expensive (e.g. fetching
+/// filesystem metadata).
+///
+/// This generator is not async-reentrant.
+template <typename T>
+class PushGenerator {
+ struct State {
+ util::Mutex mutex;
+ std::deque<Result<T>> result_q;
+ util::optional<Future<T>> consumer_fut;
+ bool finished = false;
+ };
+
+ public:
+ /// Producer API for PushGenerator
+ class Producer {
+ public:
+ explicit Producer(std::shared_ptr<State> state) : state_(std::move(state))
{}
+
+ /// Push a value on the queue
+ void Push(Result<T> result) {
+ auto lock = state_->mutex.Lock();
+ if (state_->finished) {
+ // Closed early
+ return;
+ }
+ if (state_->consumer_fut.has_value()) {
+ auto fut = std::move(state_->consumer_fut.value());
+ state_->consumer_fut.reset();
+ lock.Unlock(); // unlock before potentially invoking a callback
+ fut.MarkFinished(std::move(result));
+ return;
+ }
+ state_->result_q.push_back(std::move(result));
+ }
+
+ /// \brief Tell the consumer we have finished producing
+ ///
+ /// It is allowed to call this and later call Push() again ("early close").
+ /// In this case, calls to Push() after the queue is closed are silently
+ /// ignored. This can help implementing non-trivial cancellation cases.
+ void Close() {
+ auto lock = state_->mutex.Lock();
+ if (state_->finished) {
+ // Already closed
+ return;
+ }
+ state_->finished = true;
+ if (state_->consumer_fut.has_value()) {
Review comment:
No, close has nothing to do with cancel. It signals a regular
end-of-stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]