This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e52956bb3e4 [fix](shuffle) EOF iff all channels done (#56731)
e52956bb3e4 is described below

commit e52956bb3e4e7a4bf58de7c8d2cb9689663ffe16
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 9 14:23:43 2025 +0800

    [fix](shuffle) EOF iff all channels done (#56731)
    
    Consider this case below:
    
    ```
                                  +--- Channel0 (Running)
                                  |
    ExchangeSink ---+--- Channel1 (EOF)
                                  |
                                  +--- Channel2 (Running)
    ```
    
    Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and
    Channel2 still need new data. If ExchangeSink returns EOF, downstream
    tasks will no longer receive blocks including EOS signal. So we must
    ensure to return EOF iff all channels are EOF.
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 15 ++++++++++++++-
 be/src/pipeline/pipeline.cpp                    |  4 ++--
 be/src/pipeline/pipeline.h                      |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp   |  2 +-
 be/src/pipeline/pipeline_task.cpp               |  7 ++++---
 be/src/pipeline/pipeline_task.h                 |  6 +++++-
 6 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index d3b8152bd6b..c32860811af 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -514,7 +514,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         for (auto& channel : local_state.channels) {
             COUNTER_UPDATE(local_state.memory_used_counter(), 
-channel->mem_usage());
             Status st = channel->close(state);
-            if (!st.ok() && final_st.ok()) {
+            /**
+             * Consider this case below:
+             *
+             *                 +--- Channel0 (Running)
+             *                 |
+             * ExchangeSink ---+--- Channel1 (EOF)
+             *                 |
+             *                 +--- Channel2 (Running)
+             *
+             * Channel1 is EOF now and return `END_OF_FILE` here. However, 
Channel0 and Channel2
+             * still need new data. If ExchangeSink returns EOF, downstream 
tasks will no longer receive
+             * blocks including EOS signal. So we must ensure to return EOF 
iff all channels are EOF.
+             */
+            if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) 
{
                 final_st = st;
             }
         }
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 49d86bf4eb9..774561bbe37 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -111,7 +111,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
     return Status::OK();
 }
 
-void Pipeline::make_all_runnable() {
+void Pipeline::make_all_runnable(PipelineId wake_by) {
     DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
         auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
                 "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -124,7 +124,7 @@ void Pipeline::make_all_runnable() {
     if (_sink->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
-                task->set_wake_up_early();
+                task->set_wake_up_early(wake_by);
             }
         }
         for (auto* task : _tasks) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7fff24cf8d9..91ac4dd3cd6 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -104,7 +104,7 @@ public:
         _tasks[i] = task;
     }
 
-    void make_all_runnable();
+    void make_all_runnable(PipelineId wake_by);
 
     void set_num_tasks(int num_tasks) {
         _num_tasks = num_tasks;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d59bffe4ad9..8de7a6eec28 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1782,7 +1782,7 @@ void 
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
     if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
         if (_dag.contains(pipeline_id)) {
             for (auto dep : _dag[pipeline_id]) {
-                _pip_id_to_pipeline[dep]->make_all_runnable();
+                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
             }
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index e069cb840d1..5a86dd2626a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -723,10 +723,11 @@ std::string PipelineTask::debug_string() {
 
     fmt::format_to(debug_string_buffer,
                    "PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry 
run = "
-                   "{}, _wake_up_early = {}, time elapsed since last state 
changing = {}s, spilling"
-                   " = {}, is running = {}]",
+                   "{}, _wake_up_early = {}, _wake_up_by = {}, time elapsed 
since last state "
+                   "changing = {}s, spilling = {}, is running = {}]",
                    _index, _opened, _eos, _to_string(_exec_state), _dry_run, 
_wake_up_early.load(),
-                   _state_change_watcher.elapsed_time() / NANOS_PER_SEC, 
_spilling, is_running());
+                   _wake_by, _state_change_watcher.elapsed_time() / 
NANOS_PER_SEC, _spilling,
+                   is_running());
     std::unique_lock<std::mutex> lc(_dependency_lock);
     auto* cur_blocked_dep = _blocked_dep;
     auto fragment = _fragment_context.lock();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 8c11cb87b61..08b57e0c630 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -127,7 +127,10 @@ public:
     int task_id() const { return _index; };
     bool is_finalized() const { return _exec_state == State::FINALIZED; }
 
-    void set_wake_up_early() { _wake_up_early = true; }
+    void set_wake_up_early(PipelineId wake_by = -1) {
+        _wake_up_early = true;
+        _wake_by = wake_by;
+    }
 
     // Execution phase should be terminated. This is called if this task is 
canceled or waken up early.
     void terminate();
@@ -309,6 +312,7 @@ private:
     MonotonicStopWatch _state_change_watcher;
     std::atomic<bool> _spilling = false;
     const std::string _pipeline_name;
+    int _wake_by = -1;
 };
 
 using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to