bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666201112
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
const char* kind_name() override { return "SourceNode"; }
- static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
- void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
- void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
- void InputFinished(ExecNode*, int) override { NoInputs(); }
+ [[noreturn]] static void NoInputs() {
+ DCHECK(false) << "no inputs; this should never be called";
+ std::abort();
+ }
+ [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override {
NoInputs(); }
+ [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
Status StartProducing() override {
- if (finished_) {
- return Status::Invalid("Restarted SourceNode '", label(), "'");
+ DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+ CallbackOptions options;
+ if (auto executor = plan()->exec_context()->executor()) {
+ // These options will transfer execution to the desired Executor if
necessary.
+ // This can happen for in-memory scans where batches didn't require
+ // any CPU work to decode. Otherwise, parsing etc should have already
+ // been placed us on the desired Executor and no queues will be pushed
to.
+ options.executor = executor;
+ options.should_schedule = ShouldSchedule::IfDifferentExecutor;
}
- finished_fut_ =
- Loop([this] {
- std::unique_lock<std::mutex> lock(mutex_);
- int seq = next_batch_index_++;
- if (finished_) {
- return Future<ControlFlow<int>>::MakeFinished(Break(seq));
- }
- lock.unlock();
-
- return generator_().Then(
- [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
- std::unique_lock<std::mutex> lock(mutex_);
- if (!batch || finished_) {
- finished_ = true;
- return Break(seq);
- }
- lock.unlock();
-
- // TODO check if we are on the desired Executor and transfer
if not.
- // This can happen for in-memory scans where batches didn't
require
- // any CPU work to decode. Otherwise, parsing etc should have
already
- // been placed us on the thread pool
- outputs_[0]->InputReceived(this, seq, *batch);
- return Continue();
- },
- [=](const Status& error) -> ControlFlow<int> {
- std::unique_lock<std::mutex> lock(mutex_);
- if (!finished_) {
- finished_ = true;
+ finished_ = Loop([this, options] {
Review comment:
Is there a reason not to use Loop, though?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]