This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9a71b083773 branch-3.1: [fix](shuffle) EOF iff all channels done 
#56731 (#56833)
9a71b083773 is described below

commit 9a71b08377344f0eac3f97ec43c6068b65ca6c55
Author: Gabriel <[email protected]>
AuthorDate: Mon Oct 13 12:07:46 2025 +0800

    branch-3.1: [fix](shuffle) EOF iff all channels done #56731 (#56833)
    
    picked from #56731
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 15 ++++++++++++++-
 be/src/pipeline/pipeline.cpp                    |  4 ++--
 be/src/pipeline/pipeline.h                      |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp   |  2 +-
 be/src/pipeline/pipeline_task.cpp               |  4 ++--
 be/src/pipeline/pipeline_task.h                 |  6 +++++-
 6 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 6280f77cb4c..cf1542dd286 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -606,7 +606,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         local_state._serializer.reset_block();
         for (auto& channel : local_state.channels) {
             Status st = channel->close(state);
-            if (!st.ok() && final_st.ok()) {
+            /**
+             * Consider this case below:
+             *
+             *                 +--- Channel0 (Running)
+             *                 |
+             * ExchangeSink ---+--- Channel1 (EOF)
+             *                 |
+             *                 +--- Channel2 (Running)
+             *
+             * Channel1 is EOF now and return `END_OF_FILE` here. However, 
Channel0 and Channel2
+             * still need new data. If ExchangeSink returns EOF, downstream 
tasks will no longer receive
+             * blocks including EOS signal. So we must ensure to return EOF 
iff all channels are EOF.
+             */
+            if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) 
{
                 final_st = st;
             }
         }
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 7397c3221d5..2d418af3a14 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -108,7 +108,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
     return Status::OK();
 }
 
-void Pipeline::make_all_runnable() {
+void Pipeline::make_all_runnable(PipelineId wake_by) {
     DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
         auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
                 "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -121,7 +121,7 @@ void Pipeline::make_all_runnable() {
     if (_sink->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
-                task->set_wake_up_early();
+                task->set_wake_up_early(wake_by);
             }
         }
         for (auto* task : _tasks) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index abfe883a804..9981a92f98b 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -102,7 +102,7 @@ public:
         _tasks[i] = task;
     }
 
-    void make_all_runnable();
+    void make_all_runnable(PipelineId wake_by);
 
     void set_num_tasks(int num_tasks) {
         _num_tasks = num_tasks;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4a069483dc3..e814e3a1dee 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1750,7 +1750,7 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId 
pipeline_id) {
     if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
         if (_dag.contains(pipeline_id)) {
             for (auto dep : _dag[pipeline_id]) {
-                _pip_id_to_pipeline[dep]->make_all_runnable();
+                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
             }
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 04f869007da..592a1563ace 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -593,9 +593,9 @@ std::string PipelineTask::debug_string() {
     fmt::format_to(
             debug_string_buffer,
             "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized = 
{}, dry run = "
-            "{}, _wake_up_early = {}, is running = {}]",
+            "{}, _wake_up_early = {}, _wake_up_by = {}, is running = {}]",
             (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
_wake_up_early.load(),
-            is_running());
+            _wake_by, is_running());
     std::unique_lock<std::mutex> lc(_dependency_lock);
     auto* cur_blocked_dep = _blocked_dep;
     auto fragment = _fragment_context.lock();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 645879b7043..47a3fbc31ea 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -135,7 +135,10 @@ public:
     int task_id() const { return _index; };
     bool is_finalized() const { return _finalized; }
 
-    void set_wake_up_early() { _wake_up_early = true; }
+    void set_wake_up_early(PipelineId wake_by = -1) {
+        _wake_up_early = true;
+        _wake_by = wake_by;
+    }
 
     void clear_blocking_state() {
         auto fragment = _fragment_context.lock();
@@ -319,6 +322,7 @@ private:
     const std::string _pipeline_name;
     // PipelineTask maybe hold by TaskQueue
     std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
+    int _wake_by = -1;
 };
 
 using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to