pitrou commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608445248



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread 
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  
Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed 
as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+  template <typename T = ::arrow::detail::Empty>
+  using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+  template <typename T = ::arrow::detail::Empty>
+  using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;
+
+  SerialExecutor();
+  ~SerialExecutor();
+
+  int GetCapacity() override { return 1; };
+  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+                   StopCallback&&) override;
+
+  /// \brief Runs the scheduler and any scheduled tasks
+  ///
+  /// The scheduler must either return an invalid status or call the finish 
signal.
+  /// Failure to do this will result in a deadlock.  For this reason it is 
preferable (if
+  /// possible) to use the helper methods (below) RunSynchronously/RunSerially 
which
+  /// delegates the responsiblity onto a Future producer's existing 
responsibility to
+  /// always mark a future finished (which can someday be aided by 
ARROW-12207).
+  template <typename T>
+  static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) {
+    auto serial_executor = std::make_shared<SerialExecutor>();
+    return serial_executor->Run<T>(std::move(initial_task));
+  }
+
+ private:
+  // State uses mutex
+  struct State;
+  std::unique_ptr<State> state_;
+
+  template <typename T>
+  Result<T> Run(Scheduler<T> initial_task) {
+    bool finished = false;
+    Result<T> final_result;
+    FinishSignal<T> finish_signal = [&](const Result<T>& res) {
+      // The finish signal could be called from an external executor callback 
so need to
+      // protect it with the mutex.  Also, final_result must be set before the 
call to
+      // MarkFinished here to ensure we don't try and return it before it is 
finished
+      // setting
+      final_result = res;
+      MarkFinished(finished);
+    };
+    ARROW_RETURN_NOT_OK(std::move(initial_task)(this, 
std::move(finish_signal)));
+    RunLoop(finished);
+    return final_result;
+  }
+  void RunLoop(const bool& finished);
+  void MarkFinished(bool& finished);

Review comment:
       Also, it seems `bool finished` may simply be an internal state member of 
`SerialExecutor`, e.g.:
   ```
   void RunLoop();
   void FinishRunning();
   ```
   
   (though, is the executor meant to be restartable?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to