save-buffer commented on code in PR #13143:
URL: https://github.com/apache/arrow/pull/13143#discussion_r882914624
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -96,64 +96,51 @@ struct SourceNode : ExecNode {
options.executor = executor;
options.should_schedule = ShouldSchedule::IfDifferentExecutor;
}
- finished_ = Loop([this, executor, options] {
- std::unique_lock<std::mutex> lock(mutex_);
- int total_batches = batch_count_++;
- if (stop_requested_) {
- return
Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
- }
- lock.unlock();
-
- return generator_().Then(
- [=](const util::optional<ExecBatch>& maybe_batch)
- -> Future<ControlFlow<int>> {
- std::unique_lock<std::mutex> lock(mutex_);
- if (IsIterationEnd(maybe_batch) || stop_requested_) {
- stop_requested_ = true;
- return Break(total_batches);
- }
- lock.unlock();
- ExecBatch batch = std::move(*maybe_batch);
-
- if (executor) {
- auto status = task_group_.AddTask(
- [this, executor, batch]() -> Result<Future<>> {
- return executor->Submit([=]() {
- outputs_[0]->InputReceived(this,
std::move(batch));
- return Status::OK();
- });
- });
- if (!status.ok()) {
- outputs_[0]->ErrorReceived(this,
std::move(status));
- return Break(total_batches);
- }
- } else {
- outputs_[0]->InputReceived(this, std::move(batch));
- }
- lock.lock();
- if (!backpressure_future_.is_finished()) {
- EVENT(span_, "Source paused due to backpressure");
- return backpressure_future_.Then(
- []() -> ControlFlow<int> { return Continue(); });
- }
- return
Future<ControlFlow<int>>::MakeFinished(Continue());
- },
- [=](const Status& error) -> ControlFlow<int> {
- // NB: ErrorReceived is independent of InputFinished,
but
- // ErrorReceived will usually prompt StopProducing
which will
- // prompt InputFinished. ErrorReceived may still be
called from a
- // node which was requested to stop (indeed, the
request to stop
- // may prompt an error).
- std::unique_lock<std::mutex> lock(mutex_);
- stop_requested_ = true;
- lock.unlock();
- outputs_[0]->ErrorReceived(this, error);
- return Break(total_batches);
- },
- options);
- }).Then([&](int total_batches) {
- outputs_[0]->InputFinished(this, total_batches);
- return task_group_.End();
+ Loop([this, options] {
+ std::unique_lock<std::mutex> lock(mutex_);
+ int total_batches = batch_count_++;
+ if (stop_requested_) {
+ return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
+ }
+ lock.unlock();
+
+ return generator_().Then(
+ [=](const util::optional<ExecBatch>& maybe_batch) ->
Future<ControlFlow<int>> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (IsIterationEnd(maybe_batch) || stop_requested_) {
+ stop_requested_ = true;
+ return Break(total_batches);
+ }
+ lock.unlock();
+ ExecBatch batch = std::move(*maybe_batch);
+ RETURN_NOT_OK(plan_->ScheduleTask([=]() {
+ outputs_[0]->InputReceived(this, std::move(batch));
+ return Status::OK();
+ }));
+ lock.lock();
+ if (!backpressure_future_.is_finished()) {
+ EVENT(span_, "Source paused due to backpressure");
+ return backpressure_future_.Then(
+ []() -> ControlFlow<int> { return Continue(); });
+ }
+ return Future<ControlFlow<int>>::MakeFinished(Continue());
+ },
+ [=](const Status& error) -> ControlFlow<int> {
+ // NB: ErrorReceived is independent of InputFinished, but
+ // ErrorReceived will usually prompt StopProducing which will
+ // prompt InputFinished. ErrorReceived may still be called from a
+ // node which was requested to stop (indeed, the request to stop
+ // may prompt an error).
Review Comment:
Good point, I didn't understand this comment anyway 😛
This whole node gets rewritten in Part 2 anyway, but yes I'll delete the
comment here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]