lidavidm commented on a change in pull request #11274:
URL: https://github.com/apache/arrow/pull/11274#discussion_r719616956
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
PushGenerator<util::optional<ExecBatch>>::Producer producer_;
};
-// A sink node that accumulates inputs, then sorts them before emitting them.
struct OrderBySinkNode final : public SinkNode {
- OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions
sort_options,
+ OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<util::optional<ExecBatch>>* generator)
- : SinkNode(plan, std::move(inputs), generator),
- sort_options_(std::move(sort_options)) {}
+ : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
const char* kind_name() const override { return "OrderBySinkNode"; }
- static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
- const ExecNodeOptions& options) {
+ // A sink node that accumulates inputs, then sorts them before emitting them.
+ static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
const auto& sink_options = checked_cast<const
OrderBySinkNodeOptions&>(options);
- return plan->EmplaceNode<OrderBySinkNode>(
- plan, std::move(inputs), sink_options.sort_options,
sink_options.generator);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+ sink_options.sort_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
+ }
+
+ // A sink node that receives inputs and then compute top_k/bottom_k.
+ static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+ const auto& sink_options = checked_cast<const
SelectKSinkNodeOptions&>(options);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSelectK(plan->exec_context(),
inputs[0]->output_schema(),
+ sink_options.select_k_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
}
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- // Accumulate data
- {
- std::unique_lock<std::mutex> lock(mutex_);
- auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-
plan()->exec_context()->memory_pool());
- if (ErrorIfNotOk(maybe_batch.status())) return;
- batches_.push_back(maybe_batch.MoveValueUnsafe());
- }
+ auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+
plan()->exec_context()->memory_pool());
+ if (ErrorIfNotOk(maybe_batch.status())) return;
+ auto record_batch = maybe_batch.MoveValueUnsafe();
+ Status status = impl_->InputReceived(std::move(record_batch));
+ if (!status.ok()) {
Review comment:
This should use the same pattern as above.
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
PushGenerator<util::optional<ExecBatch>>::Producer producer_;
};
-// A sink node that accumulates inputs, then sorts them before emitting them.
struct OrderBySinkNode final : public SinkNode {
- OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions
sort_options,
+ OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<util::optional<ExecBatch>>* generator)
- : SinkNode(plan, std::move(inputs), generator),
- sort_options_(std::move(sort_options)) {}
+ : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
const char* kind_name() const override { return "OrderBySinkNode"; }
- static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
- const ExecNodeOptions& options) {
+ // A sink node that accumulates inputs, then sorts them before emitting them.
+ static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
const auto& sink_options = checked_cast<const
OrderBySinkNodeOptions&>(options);
- return plan->EmplaceNode<OrderBySinkNode>(
- plan, std::move(inputs), sink_options.sort_options,
sink_options.generator);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+ sink_options.sort_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
+ }
+
+ // A sink node that receives inputs and then compute top_k/bottom_k.
+ static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+ const auto& sink_options = checked_cast<const
SelectKSinkNodeOptions&>(options);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSelectK(plan->exec_context(),
inputs[0]->output_schema(),
+ sink_options.select_k_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
}
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- // Accumulate data
- {
- std::unique_lock<std::mutex> lock(mutex_);
- auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-
plan()->exec_context()->memory_pool());
- if (ErrorIfNotOk(maybe_batch.status())) return;
- batches_.push_back(maybe_batch.MoveValueUnsafe());
- }
+ auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+
plan()->exec_context()->memory_pool());
+ if (ErrorIfNotOk(maybe_batch.status())) return;
Review comment:
It seems a little hard to write a test case for this, however.
Practically this can only come up if there's an allocation failure.
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
PushGenerator<util::optional<ExecBatch>>::Producer producer_;
};
-// A sink node that accumulates inputs, then sorts them before emitting them.
struct OrderBySinkNode final : public SinkNode {
- OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions
sort_options,
+ OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<util::optional<ExecBatch>>* generator)
- : SinkNode(plan, std::move(inputs), generator),
- sort_options_(std::move(sort_options)) {}
+ : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
const char* kind_name() const override { return "OrderBySinkNode"; }
- static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
- const ExecNodeOptions& options) {
+ // A sink node that accumulates inputs, then sorts them before emitting them.
+ static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
const auto& sink_options = checked_cast<const
OrderBySinkNodeOptions&>(options);
- return plan->EmplaceNode<OrderBySinkNode>(
- plan, std::move(inputs), sink_options.sort_options,
sink_options.generator);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+ sink_options.sort_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
+ }
+
+ // A sink node that receives inputs and then compute top_k/bottom_k.
+ static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+ const auto& sink_options = checked_cast<const
SelectKSinkNodeOptions&>(options);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSelectK(plan->exec_context(),
inputs[0]->output_schema(),
+ sink_options.select_k_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
}
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- // Accumulate data
- {
- std::unique_lock<std::mutex> lock(mutex_);
- auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-
plan()->exec_context()->memory_pool());
- if (ErrorIfNotOk(maybe_batch.status())) return;
- batches_.push_back(maybe_batch.MoveValueUnsafe());
- }
+ auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+
plan()->exec_context()->memory_pool());
+ if (ErrorIfNotOk(maybe_batch.status())) return;
Review comment:
Hmm, I realize this error was in the original impl, but ErrorIfNotOk
here basically just throws away the error since there's no outputs. Instead, we
should do the same thing as Finish.
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
PushGenerator<util::optional<ExecBatch>>::Producer producer_;
};
-// A sink node that accumulates inputs, then sorts them before emitting them.
struct OrderBySinkNode final : public SinkNode {
- OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions
sort_options,
+ OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<util::optional<ExecBatch>>* generator)
- : SinkNode(plan, std::move(inputs), generator),
- sort_options_(std::move(sort_options)) {}
+ : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
const char* kind_name() const override { return "OrderBySinkNode"; }
- static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
- const ExecNodeOptions& options) {
+ // A sink node that accumulates inputs, then sorts them before emitting them.
+ static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
const auto& sink_options = checked_cast<const
OrderBySinkNodeOptions&>(options);
- return plan->EmplaceNode<OrderBySinkNode>(
- plan, std::move(inputs), sink_options.sort_options,
sink_options.generator);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+ sink_options.sort_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
+ }
+
+ // A sink node that receives inputs and then compute top_k/bottom_k.
+ static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*>
inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+ const auto& sink_options = checked_cast<const
SelectKSinkNodeOptions&>(options);
+ ARROW_ASSIGN_OR_RAISE(
+ std::unique_ptr<OrderByImpl> impl,
+ OrderByImpl::MakeSelectK(plan->exec_context(),
inputs[0]->output_schema(),
+ sink_options.select_k_options));
+ return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs),
std::move(impl),
+ sink_options.generator);
}
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- // Accumulate data
- {
- std::unique_lock<std::mutex> lock(mutex_);
- auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-
plan()->exec_context()->memory_pool());
- if (ErrorIfNotOk(maybe_batch.status())) return;
- batches_.push_back(maybe_batch.MoveValueUnsafe());
- }
+ auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+
plan()->exec_context()->memory_pool());
+ if (ErrorIfNotOk(maybe_batch.status())) return;
Review comment:
```suggestion
if (ErrorIfNotOk(maybe_batch.status())) {
StopProducing();
bool cancelled = input_counter_.Cancel();
DCHECK(cancelled);
finished_.MarkFinished(maybe_batch.status());
return;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]