westonpace commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614257805



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) 
{}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) 
{}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       Since `is_closed` is already "best effort" you could probably make 
`finished` an atomic and avoid the lock here but that's a pretty minor concern 
and could potentially make other paths more expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to