westonpace commented on a change in pull request #9941: URL: https://github.com/apache/arrow/pull/9941#discussion_r611956685
########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1167,90 +1164,180 @@ class BackgroundGenerator { struct State { State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart) : io_executor(io_executor), + max_q(max_q), + q_restart(q_restart), it(std::move(it)), - running(false), + reading(false), finished(false), - max_q(max_q), - q_restart(q_restart) {} + should_shutdown(false) {} void ClearQueue() { while (!queue.empty()) { queue.pop(); } } - void RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) { - if (!finished) { - running = true; - auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); - if (!spawn_status.ok()) { - running = false; - finished = true; - if (waiting_future.has_value()) { - auto to_deliver = std::move(waiting_future.value()); - waiting_future.reset(); - guard.Unlock(); - to_deliver.MarkFinished(spawn_status); - } else { - ClearQueue(); - queue.push(spawn_status); - } + bool TaskIsRunning() const { return task_finished.is_valid(); } + + bool NeedsRestart() const { + return !finished && !reading && static_cast<int>(queue.size()) <= q_restart; + } + + void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) { + // If we get here we are actually going to start a new task so let's create a + // task_finished future for it + state->task_finished = Future<>::Make(); + state->reading = true; + auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); + if (!spawn_status.ok()) { + // If we can't spawn a new task then send an error to the consumer (either via a + // waiting future or the queue) and mark ourselves finished + state->finished = true; + state->task_finished = Future<>(); + if (waiting_future.has_value()) { + auto to_deliver = std::move(waiting_future.value()); + waiting_future.reset(); + guard.Unlock(); + to_deliver.MarkFinished(spawn_status); + } else { + ClearQueue(); + queue.push(spawn_status); } } } + Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard, + Future<T> next) { + if (TaskIsRunning()) { + // If the task is still cleaning up we need to wait for it to finish before + // restarting. We also want to block the consumer until we've restarted the + // reader to avoid multiple restarts + return task_finished.Then([state, next](...) { + // This may appear dangerous (recursive mutex) but we should be guaranteed the + // outer guard has been released by this point. We know... + // * task_finished is not already finished (it would be invalid in that case) + // * task_finished will not be marked complete until we've given up the mutex + auto guard_ = state->mutex.Lock(); + state->DoRestartTask(state, std::move(guard_)); + return next; + }); + } + // Otherwise we can restart immediately + DoRestartTask(std::move(state), std::move(guard)); + return next; + } + internal::Executor* io_executor; + const int max_q; + const int q_restart; Iterator<T> it; - bool running; + + // If true, the task is actively pumping items from the queue and does not need a + // restart + bool reading; + // Set to true when a terminal item arrives bool finished; - int max_q; - int q_restart; + // Signal to the background task to end early because consumers have given up on it + bool should_shutdown; + // If the queue is empty then the consumer will create a waiting future and wait for + // it std::queue<Result<T>> queue; util::optional<Future<T>> waiting_future; + // Every background task is given a future to complete when it is entirely finished + // processing and ready for the next task to start or for State to be destroyed + Future<> task_finished; util::Mutex mutex; }; + // Cleanup task that will be run when all consumer references to the generator are lost + struct Cleanup { + explicit Cleanup(State* state) : state(state) {} + ~Cleanup() { + Future<> finish_fut; + { + auto lock = state->mutex.Lock(); + if (!state->TaskIsRunning()) { + return; + } + // Signal the current task to stop and wait for it to finish + state->should_shutdown = true; + finish_fut = state->task_finished; + } + // Using future as a condition variable here + Status st = finish_fut.status(); + ARROW_UNUSED(st); + } + State* state; + }; + class Task { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org