westonpace commented on a change in pull request #9941:
URL: https://github.com/apache/arrow/pull/9941#discussion_r609106645



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1195,25 +1199,48 @@ class BackgroundGenerator {
             ClearQueue();
             queue.push(spawn_status);
           }
+          task_finished.MarkFinished();
         }
       }
     }
 
     internal::Executor* io_executor;
     Iterator<T> it;
+    bool started;
     bool running;
     bool finished;
+    bool should_shutdown;
     int max_q;
     int q_restart;
     std::queue<Result<T>> queue;
     util::optional<Future<T>> waiting_future;
     util::Mutex mutex;
+    Future<> task_finished;
+  };
+
+  struct Cleanup {
+    explicit Cleanup(State* state) : state(state) {}
+    ~Cleanup() {
+      Future<> finish_fut;
+      {
+        auto lock = state->mutex.Lock();
+        if (!state->started) {

Review comment:
       I don't think running works.  Consider the cleanup happens while the 
background thread is about to call `waiting_future.MarkFinished(next);` and the 
queue has just filled up so `running` is false.  If we don't wait and just 
return then it is possible the callback accesses deleted state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to