lidavidm commented on code in PR #15104:
URL: https://github.com/apache/arrow/pull/15104#discussion_r1059402726


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -43,16 +44,20 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::ThreadPool;
 using internal::ToChars;
 
 namespace compute {
 
 namespace {
 
 struct ExecPlanImpl : public ExecPlan {
-  explicit ExecPlanImpl(QueryOptions options, ExecContext* exec_context,
-                        std::shared_ptr<const KeyValueMetadata> metadata = 
NULLPTR)
-      : metadata_(std::move(metadata)), query_context_(options, *exec_context) 
{}
+  explicit ExecPlanImpl(QueryOptions options, ExecContext exec_context,
+                        std::shared_ptr<const KeyValueMetadata> metadata = 
NULLPTR,
+                        std::shared_ptr<ThreadPool> owned_thread_pool = 
nullptr)

Review Comment:
   nit: be consistent with NULLPTR/nullptr (note that NULLPTR is really only 
necessary in headers, not in source files)



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -188,8 +192,20 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler {
 
   void AbortUnlocked(const Status& st, std::unique_lock<std::mutex>&& lk) {
     DCHECK(!st.ok());
+    bool aborted = false;
     if (!IsAborted()) {
       maybe_error_ = st;
+      // Add one more "task" to represent running the abort callback.  This
+      // will prevent any other task finishing and marking the scheduler 
finished
+      // while we are running the abort callback.
+      running_tasks_++;
+      aborted = true;
+    }
+    if (aborted) {
+      lk.unlock();
+      std::move(abort_callback_)(st);
+      lk.lock();
+      running_tasks_--;

Review Comment:
   This should probably be inside the lock?



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -90,6 +95,15 @@ struct ExecPlanImpl : public ExecPlan {
     if (started_) {
       return Status::Invalid("restarted ExecPlan");
     }
+    if (query_context_.exec_context()->executor() == nullptr) {
+      return Status::Invalid(
+          "An exec plan must have an executor for CPU tasks.  To run without 
threads use "
+          "a SerialExeuctor (the arrow::compute::DeclarationTo... methods 
should take "
+          "care of this for you and are an easier way to execute an 
ExecPlan.");

Review Comment:
   nit: unclosed parenthesis :)



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -419,37 +429,116 @@ struct ARROW_EXPORT Declaration {
 /// This method will add a sink node to the declaration to collect results 
into a
 /// table.  It will then create an ExecPlan from the declaration, start the 
exec plan,
 /// block until the plan has finished, and return the created table.
-ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
-    Declaration declaration, ExecContext* exec_context = 
default_exec_context());
+///
+/// If `use_threads` is false then all CPU work will be done on the calling 
thread.  I/O
+/// tasks will still happen on the I/O executor and may be multi-threaded (but 
should
+/// not use significant CPU resources)
+ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(Declaration 
declaration,
+                                                               bool 
use_threads = true);
 
 /// \brief Asynchronous version of \see DeclarationToTable
+///
+/// The behavior of use_threads is slightly different than the synchronous 
version since
+/// we cannot run synchronously on the calling thread.  Instead, if 
use_threads=false then
+/// a new thread pool will be created with a single thread and this will be 
used for all
+/// compute work.
+///
+/// If a custom exec context is provided then the value of `use_threads` will 
be ignored
+/// and the executor in the custom context will be used.  However, the 
executor must

Review Comment:
   It doesn't seem to be possible to pass both an exec context and use_threads 
anymore.



##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -381,19 +388,11 @@ class ConsumingSinkNode : public ExecNode, public 
BackpressureControl {
 
  protected:
   void Finish(const Status& finish_st) {
-    plan_->query_context()->async_scheduler()->AddSimpleTask([this, 
&finish_st] {
-      return consumer_->Finish().Then(
-          [this, finish_st]() {
-            finished_.MarkFinished(finish_st);
-            return finish_st;
-          },
-          [this, finish_st](const Status& st) {
-            // Prefer the plan error over the consumer error
-            Status final_status = finish_st & st;
-            finished_.MarkFinished(final_status);
-            return final_status;
-          });
-    });
+    if (finish_st.ok()) {
+      plan_->query_context()->async_scheduler()->AddSimpleTask(
+          [this] { return consumer_->Finish(); });

Review Comment:
   Shouldn't we still mark finished_ as finished?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to