pitrou commented on a change in pull request #9941:
URL: https://github.com/apache/arrow/pull/9941#discussion_r609432471



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1195,36 +1199,71 @@ class BackgroundGenerator {
             ClearQueue();
             queue.push(spawn_status);
           }
+          task_finished.MarkFinished();
         }
       }
     }
 
     internal::Executor* io_executor;
     Iterator<T> it;
-    bool running;
+    // True if we are still running in the loop and will be adding more items 
to the
+    // queue, don't restart the task if this is true.  However, even if this 
is false we
+    // might still be running some finish callbacks or marking the finish 
future.
+    bool running_in_loop;
+    // If this is false then the background thread is done with everything.  
It will not
+    // be running any additional callbacks or marking the finish future.  
There is no need
+    // to wait for it when cleaning up.
+    bool running_at_all;
     bool finished;

Review comment:
       This is where complexity starts being difficult to reason about. What's 
the difference between `running_at_all` and `finished`? What are the possible 
combinations?
   You may want to define a enum to describe the current state rather than 
having three different booleans...

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1247,10 +1286,23 @@ class BackgroundGenerator {
           waiting_future.MarkFinished(next);
         }
       }
+      {
+        auto guard = state->mutex.Lock();
+        state->running_at_all = false;
+        if (state->finished) {
+          state->task_finished.MarkFinished();
+        }
+      }
     }
   };
 
   std::shared_ptr<State> state_;
+  // state_ is held by both the generator and the background thread so it 
won't be cleaned
+  // up when all consumer references are relinquished.  cleanup_ is only held 
by the
+  // generator so it will be destructed when the last consumer reference is 
gone.  We use
+  // this to cleanup / stop the background generator in case the consuming end 
stops
+  // listening (e.g. due to a downstream error)
+  std::shared_ptr<Cleanup> cleanup_;

Review comment:
       Would it be possible to simply use a `std::weak_ptr<State>` in the 
background thread?
   (if there is some cleanup to do, you may do it in the State destructor?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to