westonpace commented on a change in pull request #9941:
URL: https://github.com/apache/arrow/pull/9941#discussion_r611956685



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1167,90 +1164,180 @@ class BackgroundGenerator {
   struct State {
     State(internal::Executor* io_executor, Iterator<T> it, int max_q, int 
q_restart)
         : io_executor(io_executor),
+          max_q(max_q),
+          q_restart(q_restart),
           it(std::move(it)),
-          running(false),
+          reading(false),
           finished(false),
-          max_q(max_q),
-          q_restart(q_restart) {}
+          should_shutdown(false) {}
 
     void ClearQueue() {
       while (!queue.empty()) {
         queue.pop();
       }
     }
 
-    void RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
-      if (!finished) {
-        running = true;
-        auto spawn_status = io_executor->Spawn([state]() { 
Task()(std::move(state)); });
-        if (!spawn_status.ok()) {
-          running = false;
-          finished = true;
-          if (waiting_future.has_value()) {
-            auto to_deliver = std::move(waiting_future.value());
-            waiting_future.reset();
-            guard.Unlock();
-            to_deliver.MarkFinished(spawn_status);
-          } else {
-            ClearQueue();
-            queue.push(spawn_status);
-          }
+    bool TaskIsRunning() const { return task_finished.is_valid(); }
+
+    bool NeedsRestart() const {
+      return !finished && !reading && static_cast<int>(queue.size()) <= 
q_restart;
+    }
+
+    void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) 
{
+      // If we get here we are actually going to start a new task so let's 
create a
+      // task_finished future for it
+      state->task_finished = Future<>::Make();
+      state->reading = true;
+      auto spawn_status = io_executor->Spawn([state]() { 
Task()(std::move(state)); });
+      if (!spawn_status.ok()) {
+        // If we can't spawn a new task then send an error to the consumer 
(either via a
+        // waiting future or the queue) and mark ourselves finished
+        state->finished = true;
+        state->task_finished = Future<>();
+        if (waiting_future.has_value()) {
+          auto to_deliver = std::move(waiting_future.value());
+          waiting_future.reset();
+          guard.Unlock();
+          to_deliver.MarkFinished(spawn_status);
+        } else {
+          ClearQueue();
+          queue.push(spawn_status);
         }
       }
     }
 
+    Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard 
guard,
+                          Future<T> next) {
+      if (TaskIsRunning()) {
+        // If the task is still cleaning up we need to wait for it to finish 
before
+        // restarting.  We also want to block the consumer until we've 
restarted the
+        // reader to avoid multiple restarts
+        return task_finished.Then([state, next](...) {
+          // This may appear dangerous (recursive mutex) but we should be 
guaranteed the
+          // outer guard has been released by this point.  We know...
+          // * task_finished is not already finished (it would be invalid in 
that case)
+          // * task_finished will not be marked complete until we've given up 
the mutex
+          auto guard_ = state->mutex.Lock();
+          state->DoRestartTask(state, std::move(guard_));
+          return next;
+        });
+      }
+      // Otherwise we can restart immediately
+      DoRestartTask(std::move(state), std::move(guard));
+      return next;
+    }
+
     internal::Executor* io_executor;
+    const int max_q;
+    const int q_restart;
     Iterator<T> it;
-    bool running;
+
+    // If true, the task is actively pumping items from the queue and does not 
need a
+    // restart
+    bool reading;
+    // Set to true when a terminal item arrives
     bool finished;
-    int max_q;
-    int q_restart;
+    // Signal to the background task to end early because consumers have given 
up on it
+    bool should_shutdown;
+    // If the queue is empty then the consumer will create a waiting future 
and wait for
+    // it
     std::queue<Result<T>> queue;
     util::optional<Future<T>> waiting_future;
+    // Every background task is given a future to complete when it is entirely 
finished
+    // processing and ready for the next task to start or for State to be 
destroyed
+    Future<> task_finished;
     util::Mutex mutex;
   };
 
+  // Cleanup task that will be run when all consumer references to the 
generator are lost
+  struct Cleanup {
+    explicit Cleanup(State* state) : state(state) {}
+    ~Cleanup() {
+      Future<> finish_fut;
+      {
+        auto lock = state->mutex.Lock();
+        if (!state->TaskIsRunning()) {
+          return;
+        }
+        // Signal the current task to stop and wait for it to finish
+        state->should_shutdown = true;
+        finish_fut = state->task_finished;
+      }
+      // Using future as a condition variable here
+      Status st = finish_fut.status();
+      ARROW_UNUSED(st);
+    }
+    State* state;
+  };
+
   class Task {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to