This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 977c1f1134c [Bug](pipeline) do not treat BLOCKED->FINISHED as error
when wake_up_early is true (#61989)
977c1f1134c is described below
commit 977c1f1134c6ce93fa6a5e2cb6675515ed3ae785
Author: Pxl <[email protected]>
AuthorDate: Fri Apr 3 16:33:10 2026 +0800
[Bug](pipeline) do not treat BLOCKED->FINISHED as error when wake_up_early
is true (#61989)
This pull request improves the state transition logic for `PipelineTask`
to handle rare race conditions when a delayed `wake_up()` call arrives
after a task has already finished or finalized. It introduces an
extended state transition table for the `_wake_up_early` scenario,
ensures that illegal or backward state transitions are prevented, and
adds comprehensive unit tests for these new behaviors.
**State transition logic improvements:**
* Added a new `WAKE_UP_EARLY_LEGAL_STATE_TRANSITION` table to allow
certain additional state transitions when `_wake_up_early` is set,
specifically permitting `BLOCKED→FINISHED`, `FINISHED→RUNNABLE`, and
`FINALIZED→RUNNABLE` transitions under this mode.
* Updated `_state_transition` to use the appropriate transition table
based on `_wake_up_early`, and to treat `FINISHED/FINALIZED→RUNNABLE` as
a legal but no-op transition (state remains unchanged).
**Handling of delayed wake-up calls:**
* Modified `wake_up()` to avoid resubmitting a task to the scheduler if
a delayed wake-up arrives after the task has already finished or
finalized, preventing unintended re-execution.
**Documentation and testing:**
* Expanded the state machine documentation in `PipelineTask` to describe
the new transitions allowed by `_wake_up_early`.
* Added thorough unit tests to verify both normal and `_wake_up_early`
state transitions, including edge cases for no-op transitions and
correct handling of delayed wake-ups.
---
be/src/exec/pipeline/pipeline_task.cpp | 12 +++++-
be/src/exec/pipeline/pipeline_task.h | 17 ++++++++
be/test/exec/pipeline/pipeline_task_test.cpp | 58 +++++++++++++++++++++++++++-
3 files changed, 83 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/pipeline/pipeline_task.cpp
b/be/src/exec/pipeline/pipeline_task.cpp
index 5999854d5b1..0e98c8a2643 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -1063,12 +1063,20 @@ Status PipelineTask::_state_transition(State new_state)
{
}
_task_profile->add_info_string("TaskState", _to_string(new_state));
_task_profile->add_info_string("BlockedByDependency", _blocked_dep ?
_blocked_dep->name() : "");
- if (!LEGAL_STATE_TRANSITION[(int)new_state].contains(_exec_state)) {
+ const auto& table =
+ _wake_up_early ? WAKE_UP_EARLY_LEGAL_STATE_TRANSITION :
LEGAL_STATE_TRANSITION;
+ if (!table[(int)new_state].contains(_exec_state)) {
return Status::InternalError(
"Task state transition from {} to {} is not allowed! Task
info: {}",
_to_string(_exec_state), _to_string(new_state),
debug_string());
}
- _exec_state = new_state;
+ // FINISHED/FINALIZED → RUNNABLE is legal under wake_up_early (delayed
wake_up() arriving
+ // after the task already terminated), but we must not actually move the
state backwards.
+ bool need_move = !((_exec_state == State::FINISHED || _exec_state ==
State::FINALIZED) &&
+ new_state == State::RUNNABLE);
+ if (need_move) {
+ _exec_state = new_state;
+ }
return Status::OK();
}
diff --git a/be/src/exec/pipeline/pipeline_task.h
b/be/src/exec/pipeline/pipeline_task.h
index 999bdb07116..732de08ad9c 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -279,11 +279,19 @@ private:
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
/**
+ * Normal state machine:
*
* INITED -----> RUNNABLE -------------------------+----> FINISHED
---+---> FINALIZED
* ^ | |
* | | |
* +----------- BLOCKED <--------+------------------+
+ *
+ * When _wake_up_early is set by make_all_runnable(), additional
transitions are allowed:
+ * BLOCKED → FINISHED : task skips RUNNABLE, terminates directly
+ * FINISHED → RUNNABLE : delayed wake_up() arrives after task
already finished,
+ * legal but no-op (state stays FINISHED)
+ * FINALIZED → RUNNABLE : same as above but task already finalized,
+ * legal but no-op (state stays FINALIZED)
*/
enum class State : int {
INITED,
@@ -299,6 +307,15 @@ private:
{State::RUNNABLE}, // Target state
is FINISHED
{State::INITED, State::FINISHED}}; // Target state
is FINALIZED
+ // Extended table used when _wake_up_early is true.
+ const std::vector<std::set<State>> WAKE_UP_EARLY_LEGAL_STATE_TRANSITION = {
+ {}, // INITED
+ {State::INITED, State::RUNNABLE, State::BLOCKED, State::FINISHED,
+ State::FINALIZED}, // RUNNABLE (+ FINISHED,
FINALIZED)
+ {State::RUNNABLE, State::FINISHED}, // BLOCKED
+ {State::RUNNABLE, State::BLOCKED}, // FINISHED (+ BLOCKED)
+ {State::INITED, State::FINISHED}}; // FINALIZED
+
std::string _to_string(State state) const {
switch (state) {
case State::INITED:
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index fb55afbc9de..8029270ea47 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -479,14 +479,68 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
+ // Test normal LEGAL_STATE_TRANSITION table (with _wake_up_early = false).
+ task->_wake_up_early = false;
for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) {
auto target = (PipelineTask::State)i;
for (int j = 0; j < task->LEGAL_STATE_TRANSITION.size(); j++) {
- task->_exec_state = (PipelineTask::State)j;
+ auto from = (PipelineTask::State)j;
+ task->_exec_state = from;
EXPECT_EQ(task->_state_transition(target).ok(),
-
task->LEGAL_STATE_TRANSITION[i].contains((PipelineTask::State)j));
+ task->LEGAL_STATE_TRANSITION[i].contains(from));
}
}
+
+ // Test WAKE_UP_EARLY_LEGAL_STATE_TRANSITION table.
+ task->_wake_up_early = true;
+ for (int i = 0; i < task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION.size();
i++) {
+ auto target = (PipelineTask::State)i;
+ for (int j = 0; j < task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION.size();
j++) {
+ auto from = (PipelineTask::State)j;
+ task->_exec_state = from;
+ EXPECT_EQ(task->_state_transition(target).ok(),
+
task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION[i].contains(from));
+ }
+ }
+
+ // FINISHED→RUNNABLE under wake_up_early is legal but no-op: state stays
FINISHED.
+ task->_exec_state = PipelineTask::State::FINISHED;
+ EXPECT_TRUE(task->_state_transition(PipelineTask::State::RUNNABLE).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+
+ // FINALIZED→RUNNABLE under wake_up_early is legal but no-op: state stays
FINALIZED.
+ task->_exec_state = PipelineTask::State::FINALIZED;
+ EXPECT_TRUE(task->_state_transition(PipelineTask::State::RUNNABLE).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+
+ // BLOCKED→FINISHED under wake_up_early does transition.
+ task->_exec_state = PipelineTask::State::BLOCKED;
+ EXPECT_TRUE(task->_state_transition(PipelineTask::State::FINISHED).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+ task->_wake_up_early = false;
+
+ // Test that wake_up() succeeds when the task has already finished
(delayed wake_up race).
+ // _state_transition(RUNNABLE) is a no-op, and wake_up() should not
re-submit the task.
+ {
+ task->_wake_up_early = true;
+ std::mutex mtx;
+ task->_exec_state = PipelineTask::State::FINISHED;
+ auto dep = std::make_shared<Dependency>(0, 0, "test_dep", true);
+ task->_blocked_dep = dep.get();
+ std::unique_lock<std::mutex> lc(mtx);
+ EXPECT_TRUE(task->wake_up(dep.get(), lc).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+ }
+ {
+ std::mutex mtx;
+ task->_exec_state = PipelineTask::State::FINALIZED;
+ auto dep = std::make_shared<Dependency>(0, 0, "test_dep", true);
+ task->_blocked_dep = dep.get();
+ std::unique_lock<std::mutex> lc(mtx);
+ EXPECT_TRUE(task->wake_up(dep.get(), lc).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+ task->_wake_up_early = false;
+ }
}
TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]