westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608219924



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the 
thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+  struct InnerCallback {
+    void operator()(const Result<T> res) { 
std::move(finish_signal)(std::move(res)); }
+    SerialExecutor::FinishSignal<T> finish_signal;
+  };
+  struct OuterCallback {
+    Status operator()(Executor* executor, SerialExecutor::FinishSignal<T> 
finish_signal) {
+      auto fut = std::move(get_future)(executor);
+      fut.AddCallback(InnerCallback{std::move(finish_signal)});
+      return Status::OK();
+    }
+    FnOnce<Future<T>(Executor*)> get_future;
+  };
+  return 
SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true
+/// \see RunSerially
+///
+/// If `use_threads` is false then the operation is run normally but this 
method will
+/// still block the calling thread until the operation has completed.
+template <typename T>
+Result<T> RunSynchronously(FnOnce<Future<T>(Executor*)> get_future, bool 
use_threads) {

Review comment:
       I'm not sure I follow.  The purpose of this code is to run asynchronous 
code synchronously in the calling thread.  So the function cannot return until 
future has been completed.
   
   For example, consider an asynchronous operation that reads a file 
asynchronously from the filesystem and parses it into a schema.  First...
   
   * The calling thread sets up the read (this happens in the calling thread 
even if run asynchronously) and then launches the read with io_executor->Submit.
   * The I/O thread performs the read.  Meanwhile, the calling thread waits for 
the read to finish.  This is different than standard asynchronous flow.  In 
standard asynchronous flow the calling thread would have returned a future at 
this point.
   * The I/O thread finishes the read and adds a follow-up task to the CPU 
executor.  In this case that is our newly created thread pool.  The calling 
thread wakes up and processes the follow-up task.  In normal asynchronous flow 
this follow-up task would have been submitted to the CPU thread pool.
   * The calling thread finishes the follow-up task and calls the finish 
signal.  The calling thread then returns all the way out of the 
`RunSynchronously` call.  In normal asynchronous flow we would have returned 
from that call long ago and the CPU thread would just be marking a future 
complete.
   
   However, there are some other choices we can make here.
   
   * We could require the I/O executor be replaced with our newly created 
serial executor as well.  This way we don't use any I/O threads OR CPU threads 
(this is how the previous implementation operated anyways).  In that case we 
don't need the finish condition because we know we are done when we run out of 
tasks in the thread pool.  We could also modify the SpawnReal function to 
return an invalid status if the current calling thread was not the original 
calling thread (this would safeguard against mistakenly using the I/O executor).
   * We could keep track (perhaps in some thread local) of a counter when we 
schedule tasks on other executors and then decrement this "outgoing task count" 
when those tasks finish.  In that case we don't need the finish condition 
because we know we are finished when `outgoing task count == 0 && 
task_queue.empty()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to