This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e52956bb3e4 [fix](shuffle) EOF iff all channels done (#56731)
e52956bb3e4 is described below
commit e52956bb3e4e7a4bf58de7c8d2cb9689663ffe16
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 9 14:23:43 2025 +0800
[fix](shuffle) EOF iff all channels done (#56731)
Consider this case below:
```
+--- Channel0 (Running)
|
ExchangeSink ---+--- Channel1 (EOF)
|
+--- Channel2 (Running)
```
Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and
Channel2 still need new data. If ExchangeSink returns EOF, downstream
tasks will no longer receive blocks including EOS signal. So we must
ensure to return EOF iff all channels are EOF.
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 15 ++++++++++++++-
be/src/pipeline/pipeline.cpp | 4 ++--
be/src/pipeline/pipeline.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 7 ++++---
be/src/pipeline/pipeline_task.h | 6 +++++-
6 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index d3b8152bd6b..c32860811af 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -514,7 +514,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
for (auto& channel : local_state.channels) {
COUNTER_UPDATE(local_state.memory_used_counter(),
-channel->mem_usage());
Status st = channel->close(state);
- if (!st.ok() && final_st.ok()) {
+ /**
+ * Consider this case below:
+ *
+ * +--- Channel0 (Running)
+ * |
+ * ExchangeSink ---+--- Channel1 (EOF)
+ * |
+ * +--- Channel2 (Running)
+ *
+ * Channel1 is EOF now and return `END_OF_FILE` here. However,
Channel0 and Channel2
+ * still need new data. If ExchangeSink returns EOF, downstream
tasks will no longer receive
+ * blocks including EOS signal. So we must ensure to return EOF
iff all channels are EOF.
+ */
+ if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok())
{
final_st = st;
}
}
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 49d86bf4eb9..774561bbe37 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -111,7 +111,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
return Status::OK();
}
-void Pipeline::make_all_runnable() {
+void Pipeline::make_all_runnable(PipelineId wake_by) {
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
auto pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
@@ -124,7 +124,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
- task->set_wake_up_early();
+ task->set_wake_up_early(wake_by);
}
}
for (auto* task : _tasks) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7fff24cf8d9..91ac4dd3cd6 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -104,7 +104,7 @@ public:
_tasks[i] = task;
}
- void make_all_runnable();
+ void make_all_runnable(PipelineId wake_by);
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d59bffe4ad9..8de7a6eec28 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1782,7 +1782,7 @@ void
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
- _pip_id_to_pipeline[dep]->make_all_runnable();
+ _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
}
}
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index e069cb840d1..5a86dd2626a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -723,10 +723,11 @@ std::string PipelineTask::debug_string() {
fmt::format_to(debug_string_buffer,
"PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry
run = "
- "{}, _wake_up_early = {}, time elapsed since last state
changing = {}s, spilling"
- " = {}, is running = {}]",
+ "{}, _wake_up_early = {}, _wake_up_by = {}, time elapsed
since last state "
+ "changing = {}s, spilling = {}, is running = {}]",
_index, _opened, _eos, _to_string(_exec_state), _dry_run,
_wake_up_early.load(),
- _state_change_watcher.elapsed_time() / NANOS_PER_SEC,
_spilling, is_running());
+ _wake_by, _state_change_watcher.elapsed_time() /
NANOS_PER_SEC, _spilling,
+ is_running());
std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 8c11cb87b61..08b57e0c630 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -127,7 +127,10 @@ public:
int task_id() const { return _index; };
bool is_finalized() const { return _exec_state == State::FINALIZED; }
- void set_wake_up_early() { _wake_up_early = true; }
+ void set_wake_up_early(PipelineId wake_by = -1) {
+ _wake_up_early = true;
+ _wake_by = wake_by;
+ }
// Execution phase should be terminated. This is called if this task is
canceled or waken up early.
void terminate();
@@ -309,6 +312,7 @@ private:
MonotonicStopWatch _state_change_watcher;
std::atomic<bool> _spilling = false;
const std::string _pipeline_name;
+ int _wake_by = -1;
};
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]