This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9a71b083773 branch-3.1: [fix](shuffle) EOF iff all channels done
#56731 (#56833)
9a71b083773 is described below
commit 9a71b08377344f0eac3f97ec43c6068b65ca6c55
Author: Gabriel <[email protected]>
AuthorDate: Mon Oct 13 12:07:46 2025 +0800
branch-3.1: [fix](shuffle) EOF iff all channels done #56731 (#56833)
picked from #56731
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 15 ++++++++++++++-
be/src/pipeline/pipeline.cpp | 4 ++--
be/src/pipeline/pipeline.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 4 ++--
be/src/pipeline/pipeline_task.h | 6 +++++-
6 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 6280f77cb4c..cf1542dd286 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -606,7 +606,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
local_state._serializer.reset_block();
for (auto& channel : local_state.channels) {
Status st = channel->close(state);
- if (!st.ok() && final_st.ok()) {
+ /**
+ * Consider this case below:
+ *
+ * +--- Channel0 (Running)
+ * |
+ * ExchangeSink ---+--- Channel1 (EOF)
+ * |
+ * +--- Channel2 (Running)
+ *
+ * Channel1 is EOF now and return `END_OF_FILE` here. However,
Channel0 and Channel2
+ * still need new data. If ExchangeSink returns EOF, downstream
tasks will no longer receive
+ * blocks including EOS signal. So we must ensure to return EOF
iff all channels are EOF.
+ */
+ if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok())
{
final_st = st;
}
}
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 7397c3221d5..2d418af3a14 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -108,7 +108,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
return Status::OK();
}
-void Pipeline::make_all_runnable() {
+void Pipeline::make_all_runnable(PipelineId wake_by) {
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
auto pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -121,7 +121,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
- task->set_wake_up_early();
+ task->set_wake_up_early(wake_by);
}
}
for (auto* task : _tasks) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index abfe883a804..9981a92f98b 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -102,7 +102,7 @@ public:
_tasks[i] = task;
}
- void make_all_runnable();
+ void make_all_runnable(PipelineId wake_by);
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4a069483dc3..e814e3a1dee 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1750,7 +1750,7 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId
pipeline_id) {
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
- _pip_id_to_pipeline[dep]->make_all_runnable();
+ _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
}
}
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 04f869007da..592a1563ace 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -593,9 +593,9 @@ std::string PipelineTask::debug_string() {
fmt::format_to(
debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized =
{}, dry run = "
- "{}, _wake_up_early = {}, is running = {}]",
+ "{}, _wake_up_early = {}, _wake_up_by = {}, is running = {}]",
(void*)this, _index, _opened, _eos, _finalized, _dry_run,
_wake_up_early.load(),
- is_running());
+ _wake_by, is_running());
std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 645879b7043..47a3fbc31ea 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -135,7 +135,10 @@ public:
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }
- void set_wake_up_early() { _wake_up_early = true; }
+ void set_wake_up_early(PipelineId wake_by = -1) {
+ _wake_up_early = true;
+ _wake_by = wake_by;
+ }
void clear_blocking_state() {
auto fragment = _fragment_context.lock();
@@ -319,6 +322,7 @@ private:
const std::string _pipeline_name;
// PipelineTask maybe hold by TaskQueue
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
+ int _wake_by = -1;
};
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]