pitrou commented on a change in pull request #9892: URL: https://github.com/apache/arrow/pull/9892#discussion_r608445248
########## File path: cpp/src/arrow/util/thread_pool.h ########## @@ -189,6 +190,64 @@ class ARROW_EXPORT Executor { StopCallback&&) = 0; }; +/// \brief An executor implementation that runs all tasks on a single thread using an +/// event loop. +/// +/// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are +/// fine but if one task needs to wait for another task it must be expressed as an +/// asynchronous continuation. +class ARROW_EXPORT SerialExecutor : public Executor { + public: + template <typename T = ::arrow::detail::Empty> + using FinishSignal = internal::FnOnce<void(const Result<T>&)>; + template <typename T = ::arrow::detail::Empty> + using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>; + + SerialExecutor(); + ~SerialExecutor(); + + int GetCapacity() override { return 1; }; + Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken, + StopCallback&&) override; + + /// \brief Runs the scheduler and any scheduled tasks + /// + /// The scheduler must either return an invalid status or call the finish signal. + /// Failure to do this will result in a deadlock. For this reason it is preferable (if + /// possible) to use the helper methods (below) RunSynchronously/RunSerially which + /// delegates the responsiblity onto a Future producer's existing responsibility to + /// always mark a future finished (which can someday be aided by ARROW-12207). + template <typename T> + static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) { + auto serial_executor = std::make_shared<SerialExecutor>(); + return serial_executor->Run<T>(std::move(initial_task)); + } + + private: + // State uses mutex + struct State; + std::unique_ptr<State> state_; + + template <typename T> + Result<T> Run(Scheduler<T> initial_task) { + bool finished = false; + Result<T> final_result; + FinishSignal<T> finish_signal = [&](const Result<T>& res) { + // The finish signal could be called from an external executor callback so need to + // protect it with the mutex. Also, final_result must be set before the call to + // MarkFinished here to ensure we don't try and return it before it is finished + // setting + final_result = res; + MarkFinished(finished); + }; + ARROW_RETURN_NOT_OK(std::move(initial_task)(this, std::move(finish_signal))); + RunLoop(finished); + return final_result; + } + void RunLoop(const bool& finished); + void MarkFinished(bool& finished); Review comment: Also, it seems `bool finished` may simply be an internal state member of `SerialExecutor`, e.g.: ``` void RunLoop(); void FinishRunning(); ``` (though, is the executor meant to be restartable?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org