lidavidm commented on code in PR #15104:
URL: https://github.com/apache/arrow/pull/15104#discussion_r1059402726
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -43,16 +44,20 @@
namespace arrow {
using internal::checked_cast;
+using internal::ThreadPool;
using internal::ToChars;
namespace compute {
namespace {
struct ExecPlanImpl : public ExecPlan {
- explicit ExecPlanImpl(QueryOptions options, ExecContext* exec_context,
- std::shared_ptr<const KeyValueMetadata> metadata =
NULLPTR)
- : metadata_(std::move(metadata)), query_context_(options, *exec_context)
{}
+ explicit ExecPlanImpl(QueryOptions options, ExecContext exec_context,
+ std::shared_ptr<const KeyValueMetadata> metadata =
NULLPTR,
+ std::shared_ptr<ThreadPool> owned_thread_pool =
nullptr)
Review Comment:
nit: be consistent with NULLPTR/nullptr (note that NULLPTR is really only
necessary in headers, not in source files)
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -188,8 +192,20 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler {
void AbortUnlocked(const Status& st, std::unique_lock<std::mutex>&& lk) {
DCHECK(!st.ok());
+ bool aborted = false;
if (!IsAborted()) {
maybe_error_ = st;
+ // Add one more "task" to represent running the abort callback. This
+ // will prevent any other task finishing and marking the scheduler
finished
+ // while we are running the abort callback.
+ running_tasks_++;
+ aborted = true;
+ }
+ if (aborted) {
+ lk.unlock();
+ std::move(abort_callback_)(st);
+ lk.lock();
+ running_tasks_--;
Review Comment:
This should probably be inside the lock?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -90,6 +95,15 @@ struct ExecPlanImpl : public ExecPlan {
if (started_) {
return Status::Invalid("restarted ExecPlan");
}
+ if (query_context_.exec_context()->executor() == nullptr) {
+ return Status::Invalid(
+ "An exec plan must have an executor for CPU tasks. To run without
threads use "
+ "a SerialExeuctor (the arrow::compute::DeclarationTo... methods
should take "
+ "care of this for you and are an easier way to execute an
ExecPlan.");
Review Comment:
nit: unclosed parenthesis :)
##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -419,37 +429,116 @@ struct ARROW_EXPORT Declaration {
/// This method will add a sink node to the declaration to collect results
into a
/// table. It will then create an ExecPlan from the declaration, start the
exec plan,
/// block until the plan has finished, and return the created table.
-ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
- Declaration declaration, ExecContext* exec_context =
default_exec_context());
+///
+/// If `use_threads` is false then all CPU work will be done on the calling
thread. I/O
+/// tasks will still happen on the I/O executor and may be multi-threaded (but
should
+/// not use significant CPU resources)
+ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(Declaration
declaration,
+ bool
use_threads = true);
/// \brief Asynchronous version of \see DeclarationToTable
+///
+/// The behavior of use_threads is slightly different than the synchronous
version since
+/// we cannot run synchronously on the calling thread. Instead, if
use_threads=false then
+/// a new thread pool will be created with a single thread and this will be
used for all
+/// compute work.
+///
+/// If a custom exec context is provided then the value of `use_threads` will
be ignored
+/// and the executor in the custom context will be used. However, the
executor must
Review Comment:
It doesn't seem to be possible to pass both an exec context and use_threads
anymore.
##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -381,19 +388,11 @@ class ConsumingSinkNode : public ExecNode, public
BackpressureControl {
protected:
void Finish(const Status& finish_st) {
- plan_->query_context()->async_scheduler()->AddSimpleTask([this,
&finish_st] {
- return consumer_->Finish().Then(
- [this, finish_st]() {
- finished_.MarkFinished(finish_st);
- return finish_st;
- },
- [this, finish_st](const Status& st) {
- // Prefer the plan error over the consumer error
- Status final_status = finish_st & st;
- finished_.MarkFinished(final_status);
- return final_status;
- });
- });
+ if (finish_st.ok()) {
+ plan_->query_context()->async_scheduler()->AddSimpleTask(
+ [this] { return consumer_->Finish(); });
Review Comment:
Shouldn't we still mark finished_ as finished?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]