save-buffer commented on code in PR #13143:
URL: https://github.com/apache/arrow/pull/13143#discussion_r882914624


##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -96,64 +96,51 @@ struct SourceNode : ExecNode {
       options.executor = executor;
       options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
-    finished_ = Loop([this, executor, options] {
-                  std::unique_lock<std::mutex> lock(mutex_);
-                  int total_batches = batch_count_++;
-                  if (stop_requested_) {
-                    return 
Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
-                  }
-                  lock.unlock();
-
-                  return generator_().Then(
-                      [=](const util::optional<ExecBatch>& maybe_batch)
-                          -> Future<ControlFlow<int>> {
-                        std::unique_lock<std::mutex> lock(mutex_);
-                        if (IsIterationEnd(maybe_batch) || stop_requested_) {
-                          stop_requested_ = true;
-                          return Break(total_batches);
-                        }
-                        lock.unlock();
-                        ExecBatch batch = std::move(*maybe_batch);
-
-                        if (executor) {
-                          auto status = task_group_.AddTask(
-                              [this, executor, batch]() -> Result<Future<>> {
-                                return executor->Submit([=]() {
-                                  outputs_[0]->InputReceived(this, 
std::move(batch));
-                                  return Status::OK();
-                                });
-                              });
-                          if (!status.ok()) {
-                            outputs_[0]->ErrorReceived(this, 
std::move(status));
-                            return Break(total_batches);
-                          }
-                        } else {
-                          outputs_[0]->InputReceived(this, std::move(batch));
-                        }
-                        lock.lock();
-                        if (!backpressure_future_.is_finished()) {
-                          EVENT(span_, "Source paused due to backpressure");
-                          return backpressure_future_.Then(
-                              []() -> ControlFlow<int> { return Continue(); });
-                        }
-                        return 
Future<ControlFlow<int>>::MakeFinished(Continue());
-                      },
-                      [=](const Status& error) -> ControlFlow<int> {
-                        // NB: ErrorReceived is independent of InputFinished, 
but
-                        // ErrorReceived will usually prompt StopProducing 
which will
-                        // prompt InputFinished. ErrorReceived may still be 
called from a
-                        // node which was requested to stop (indeed, the 
request to stop
-                        // may prompt an error).
-                        std::unique_lock<std::mutex> lock(mutex_);
-                        stop_requested_ = true;
-                        lock.unlock();
-                        outputs_[0]->ErrorReceived(this, error);
-                        return Break(total_batches);
-                      },
-                      options);
-                }).Then([&](int total_batches) {
-      outputs_[0]->InputFinished(this, total_batches);
-      return task_group_.End();
+    Loop([this, options] {
+      std::unique_lock<std::mutex> lock(mutex_);
+      int total_batches = batch_count_++;
+      if (stop_requested_) {
+        return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
+      }
+      lock.unlock();
+
+      return generator_().Then(
+          [=](const util::optional<ExecBatch>& maybe_batch) -> 
Future<ControlFlow<int>> {
+            std::unique_lock<std::mutex> lock(mutex_);
+            if (IsIterationEnd(maybe_batch) || stop_requested_) {
+              stop_requested_ = true;
+              return Break(total_batches);
+            }
+            lock.unlock();
+            ExecBatch batch = std::move(*maybe_batch);
+            RETURN_NOT_OK(plan_->ScheduleTask([=]() {
+              outputs_[0]->InputReceived(this, std::move(batch));
+              return Status::OK();
+            }));
+            lock.lock();
+            if (!backpressure_future_.is_finished()) {
+              EVENT(span_, "Source paused due to backpressure");
+              return backpressure_future_.Then(
+                  []() -> ControlFlow<int> { return Continue(); });
+            }
+            return Future<ControlFlow<int>>::MakeFinished(Continue());
+          },
+          [=](const Status& error) -> ControlFlow<int> {
+            // NB: ErrorReceived is independent of InputFinished, but
+            // ErrorReceived will usually prompt StopProducing which will
+            // prompt InputFinished. ErrorReceived may still be called from a
+            // node which was requested to stop (indeed, the request to stop
+            // may prompt an error).

Review Comment:
   Good point, I didn't understand this comment anyway 😛 
   This whole node gets rewritten in Part 2 anyway, but yes I'll delete the 
comment here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to