zagto commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r949366764
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -584,17 +574,20 @@ class AsofJoinNode : public ExecNode {
// It may happen here in cases where InputFinished was called before we
were finished
// producing results (so we didn't know the output size at that time)
if (state_.at(0)->Finished()) {
- StopProducing();
- outputs_[0]->InputFinished(this, batches_produced_);
+ return output_->InputFinished(this, batches_produced_);
}
+ return Status::OK();
}
void ProcessThread() {
for (;;) {
if (!process_.Pop()) {
return;
}
- Process();
+ Status st = Process();
+ if (!st.ok()) {
+ plan_->Abort();
Review Comment:
If we get a non-ok status here, would that mean we just abort while
discarding the Status/message? This seems confusing to the user. Maybe we could
have an `ExecPlan::Abort(Status)` that adds the status to
`ExecPlanImpl::errors_`?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -92,7 +93,18 @@ struct ExecPlanImpl : public ExecPlan {
// Adds a task which submits fn to the executor and tracks its progress.
If we're
// already stopping then the task is ignored and fn is not executed.
return task_group_
- .AddTaskIfNotEnded([executor, fn]() { return
executor->Submit(std::move(fn)); })
+ .AddTaskIfNotEnded([this, executor, fn]() -> Result<Future<>> {
+ ARROW_ASSIGN_OR_RAISE(Future<> fut,
+ executor->Submit(stop_source_.token(),
std::move(fn)));
+ fut.AddCallback([this](const Status& status) {
+ if (!status.ok()) {
+ std::lock_guard<std::mutex> guard(abort_mutex_);
+ errors_.emplace_back(std::move(status));
Review Comment:
I don't think this std::move does anything, given that status is a const
reference.
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -545,12 +528,16 @@ TEST(ExecPlanExecution, SourceConsumingSink) {
ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
ConsumingSinkNodeOptions(consumer)));
ASSERT_OK(plan->StartProducing());
- // Source should finish fairly quickly
- ASSERT_FINISHES_OK(source->finished());
+ SleepABit();
+ SleepABit();
Review Comment:
Why do we 3 calls to `SleepABit`? Probably because one may not be enough on
slower systems, but I think a comment would be helpful here
##########
cpp/src/arrow/compute/exec/filter_node.cc:
##########
@@ -60,62 +61,67 @@ class FilterNode : public MapNode {
filter_expression.type()->ToString());
}
return plan->EmplaceNode<FilterNode>(plan, std::move(inputs),
std::move(schema),
- std::move(filter_expression),
- filter_options.async_mode);
+ std::move(filter_expression));
}
const char* kind_name() const override { return "FilterNode"; }
- Result<ExecBatch> DoFilter(const ExecBatch& target) {
+ Result<ExecBatch> DoFilter(ExecBatch batch) {
ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
- SimplifyWithGuarantee(filter_, target.guarantee));
-
+ SimplifyWithGuarantee(filter_, batch.guarantee));
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Filter",
{{"filter.expression", ToStringExtra()},
{"filter.expression.simplified",
simplified_filter.ToString()},
- {"filter.length", target.length}});
+ {"filter.length", batch.length}});
- ARROW_ASSIGN_OR_RAISE(Datum mask,
ExecuteScalarExpression(simplified_filter, target,
+ ARROW_ASSIGN_OR_RAISE(Datum mask,
ExecuteScalarExpression(simplified_filter, batch,
plan()->exec_context()));
if (mask.is_scalar()) {
const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
if (mask_scalar.is_valid && mask_scalar.value) {
- return target;
+ return batch;
}
- return target.Slice(0, 0);
+ return batch.Slice(0, 0);
}
// if the values are all scalar then the mask must also be
- DCHECK(!std::all_of(target.values.begin(), target.values.end(),
+ DCHECK(!std::all_of(batch.values.begin(), batch.values.end(),
[](const Datum& value) { return value.is_scalar(); }));
- auto values = target.values;
+ auto values = batch.values;
Review Comment:
```suggestion
auto values = std::move(batch.values);
```
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -898,55 +897,35 @@ class HashJoinNode : public ExecNode {
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"batch.length", batch.length}});
- Status status = side == 0 ? OnProbeSideBatch(thread_index,
std::move(batch))
- : OnBuildSideBatch(thread_index,
std::move(batch));
-
- if (!status.ok()) {
- StopProducing();
- ErrorIfNotOk(status);
- return;
- }
+ if (side == 0)
+ RETURN_NOT_OK(OnProbeSideBatch(thread_index, std::move(batch)));
+ else
+ RETURN_NOT_OK(OnBuildSideBatch(thread_index, std::move(batch)));
if (batch_count_[side].Increment()) {
- status = side == 0 ? OnProbeSideFinished(thread_index)
- : OnBuildSideFinished(thread_index);
-
- if (!status.ok()) {
- StopProducing();
- ErrorIfNotOk(status);
- return;
- }
+ return side == 0 ? OnProbeSideFinished(thread_index)
+ : OnBuildSideFinished(thread_index);
}
+ return Status::OK();
}
- void ErrorReceived(ExecNode* input, Status error) override {
- EVENT(span_, "ErrorReceived", {{"error", error.message()}});
- DCHECK_EQ(input, inputs_[0]);
- StopProducing();
- outputs_[0]->ErrorReceived(this, std::move(error));
- }
-
- void InputFinished(ExecNode* input, int total_batches) override {
+ Status InputFinished(ExecNode* input, int total_batches) override {
ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) !=
inputs_.end());
size_t thread_index = plan_->GetThreadIndex();
int side = (input == inputs_[0]) ? 0 : 1;
EVENT(span_, "InputFinished", {{"side", side}, {"batches.length",
total_batches}});
if (batch_count_[side].SetTotal(total_batches)) {
- Status status = side == 0 ? OnProbeSideFinished(thread_index)
- : OnBuildSideFinished(thread_index);
-
- if (!status.ok()) {
- StopProducing();
- ErrorIfNotOk(status);
- return;
- }
+ return side == 0 ? OnProbeSideFinished(thread_index)
+ : OnBuildSideFinished(thread_index);
}
+ return Status::OK();
}
Status Init() override {
RETURN_NOT_OK(ExecNode::Init());
+
Review Comment:
was this intentional?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]