westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608243837
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the
thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+ struct InnerCallback {
+ void operator()(const Result<T> res) {
std::move(finish_signal)(std::move(res)); }
+ SerialExecutor::FinishSignal<T> finish_signal;
+ };
+ struct OuterCallback {
+ Status operator()(Executor* executor, SerialExecutor::FinishSignal<T>
finish_signal) {
+ auto fut = std::move(get_future)(executor);
+ fut.AddCallback(InnerCallback{std::move(finish_signal)});
+ return Status::OK();
+ }
+ FnOnce<Future<T>(Executor*)> get_future;
+ };
+ return
SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true
Review comment:
Used the suggested comments, thanks.
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();
+/// \brief Runs a potentially async operation serially
Review comment:
Fixed.
##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
std::vector<int> outs_;
};
+template <typename T = arrow::detail::Empty>
+struct TerminalCallback {
+ void operator()() {
+ auto result = std::move(callback)();
+ std::move(finish_signal)(result);
+ }
+
+ FnOnce<Result<T>()> callback;
+ SerialExecutor::FinishSignal<T> finish_signal;
+};
+
+template <>
+struct TerminalCallback<arrow::detail::Empty> {
+ void operator()() {
+ auto st = std::move(callback)();
+ if (!st.ok()) {
+ std::move(finish_signal)(st);
+ } else {
+ std::move(finish_signal)(arrow::detail::Empty());
+ }
+ }
+
+ FnOnce<Status()> callback;
+ SerialExecutor::FinishSignal<> finish_signal;
+};
+
+TEST(TestSerialExecutor, Create) {
+ bool task_ran = false;
+ SerialExecutor::Scheduler<> task = [&](Executor* executor,
+ SerialExecutor::FinishSignal<>
finish_signal) {
+ EXPECT_TRUE(executor != nullptr);
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]