This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 977c1f1134c [Bug](pipeline) do not treat BLOCKED->FINISHED as error 
when wake_up_early is true (#61989)
977c1f1134c is described below

commit 977c1f1134c6ce93fa6a5e2cb6675515ed3ae785
Author: Pxl <[email protected]>
AuthorDate: Fri Apr 3 16:33:10 2026 +0800

    [Bug](pipeline) do not treat BLOCKED->FINISHED as error when wake_up_early 
is true (#61989)
    
    This pull request improves the state transition logic for `PipelineTask`
    to handle rare race conditions when a delayed `wake_up()` call arrives
    after a task has already finished or finalized. It introduces an
    extended state transition table for the `_wake_up_early` scenario,
    ensures that illegal or backward state transitions are prevented, and
    adds comprehensive unit tests for these new behaviors.
    
    **State transition logic improvements:**
    
    * Added a new `WAKE_UP_EARLY_LEGAL_STATE_TRANSITION` table to allow
    certain additional state transitions when `_wake_up_early` is set,
    specifically permitting `BLOCKED→FINISHED`, `FINISHED→RUNNABLE`, and
    `FINALIZED→RUNNABLE` transitions under this mode.
    * Updated `_state_transition` to use the appropriate transition table
    based on `_wake_up_early`, and to treat `FINISHED/FINALIZED→RUNNABLE` as
    a legal but no-op transition (state remains unchanged).
    
    **Handling of delayed wake-up calls:**
    
    * Modified `wake_up()` to avoid resubmitting a task to the scheduler if
    a delayed wake-up arrives after the task has already finished or
    finalized, preventing unintended re-execution.
    
    **Documentation and testing:**
    
    * Expanded the state machine documentation in `PipelineTask` to describe
    the new transitions allowed by `_wake_up_early`.
    * Added thorough unit tests to verify both normal and `_wake_up_early`
    state transitions, including edge cases for no-op transitions and
    correct handling of delayed wake-ups.
---
 be/src/exec/pipeline/pipeline_task.cpp       | 12 +++++-
 be/src/exec/pipeline/pipeline_task.h         | 17 ++++++++
 be/test/exec/pipeline/pipeline_task_test.cpp | 58 +++++++++++++++++++++++++++-
 3 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_task.cpp 
b/be/src/exec/pipeline/pipeline_task.cpp
index 5999854d5b1..0e98c8a2643 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -1063,12 +1063,20 @@ Status PipelineTask::_state_transition(State new_state) 
{
     }
     _task_profile->add_info_string("TaskState", _to_string(new_state));
     _task_profile->add_info_string("BlockedByDependency", _blocked_dep ? 
_blocked_dep->name() : "");
-    if (!LEGAL_STATE_TRANSITION[(int)new_state].contains(_exec_state)) {
+    const auto& table =
+            _wake_up_early ? WAKE_UP_EARLY_LEGAL_STATE_TRANSITION : 
LEGAL_STATE_TRANSITION;
+    if (!table[(int)new_state].contains(_exec_state)) {
         return Status::InternalError(
                 "Task state transition from {} to {} is not allowed! Task 
info: {}",
                 _to_string(_exec_state), _to_string(new_state), 
debug_string());
     }
-    _exec_state = new_state;
+    // FINISHED/FINALIZED → RUNNABLE is legal under wake_up_early (delayed 
wake_up() arriving
+    // after the task already terminated), but we must not actually move the 
state backwards.
+    bool need_move = !((_exec_state == State::FINISHED || _exec_state == 
State::FINALIZED) &&
+                       new_state == State::RUNNABLE);
+    if (need_move) {
+        _exec_state = new_state;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/exec/pipeline/pipeline_task.h 
b/be/src/exec/pipeline/pipeline_task.h
index 999bdb07116..732de08ad9c 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -279,11 +279,19 @@ private:
     std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
 
     /**
+         * Normal state machine:
          *
          * INITED -----> RUNNABLE -------------------------+----> FINISHED 
---+---> FINALIZED
          *                   ^                             |                  |
          *                   |                             |                  |
          *                   +----------- BLOCKED <--------+------------------+
+         *
+         * When _wake_up_early is set by make_all_runnable(), additional 
transitions are allowed:
+         *   BLOCKED    → FINISHED : task skips RUNNABLE, terminates directly
+         *   FINISHED   → RUNNABLE : delayed wake_up() arrives after task 
already finished,
+         *                           legal but no-op (state stays FINISHED)
+         *   FINALIZED  → RUNNABLE : same as above but task already finalized,
+         *                           legal but no-op (state stays FINALIZED)
          */
     enum class State : int {
         INITED,
@@ -299,6 +307,15 @@ private:
             {State::RUNNABLE},                                // Target state 
is FINISHED
             {State::INITED, State::FINISHED}};                // Target state 
is FINALIZED
 
+    // Extended table used when _wake_up_early is true.
+    const std::vector<std::set<State>> WAKE_UP_EARLY_LEGAL_STATE_TRANSITION = {
+            {}, // INITED
+            {State::INITED, State::RUNNABLE, State::BLOCKED, State::FINISHED,
+             State::FINALIZED},                 // RUNNABLE (+ FINISHED, 
FINALIZED)
+            {State::RUNNABLE, State::FINISHED}, // BLOCKED
+            {State::RUNNABLE, State::BLOCKED},  // FINISHED (+ BLOCKED)
+            {State::INITED, State::FINISHED}};  // FINALIZED
+
     std::string _to_string(State state) const {
         switch (state) {
         case State::INITED:
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index fb55afbc9de..8029270ea47 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -479,14 +479,68 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
         EXPECT_GT(task->_execution_dependencies.size(), 1);
     }
+    // Test normal LEGAL_STATE_TRANSITION table (with _wake_up_early = false).
+    task->_wake_up_early = false;
     for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) {
         auto target = (PipelineTask::State)i;
         for (int j = 0; j < task->LEGAL_STATE_TRANSITION.size(); j++) {
-            task->_exec_state = (PipelineTask::State)j;
+            auto from = (PipelineTask::State)j;
+            task->_exec_state = from;
             EXPECT_EQ(task->_state_transition(target).ok(),
-                      
task->LEGAL_STATE_TRANSITION[i].contains((PipelineTask::State)j));
+                      task->LEGAL_STATE_TRANSITION[i].contains(from));
         }
     }
+
+    // Test WAKE_UP_EARLY_LEGAL_STATE_TRANSITION table.
+    task->_wake_up_early = true;
+    for (int i = 0; i < task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION.size(); 
i++) {
+        auto target = (PipelineTask::State)i;
+        for (int j = 0; j < task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION.size(); 
j++) {
+            auto from = (PipelineTask::State)j;
+            task->_exec_state = from;
+            EXPECT_EQ(task->_state_transition(target).ok(),
+                      
task->WAKE_UP_EARLY_LEGAL_STATE_TRANSITION[i].contains(from));
+        }
+    }
+
+    // FINISHED→RUNNABLE under wake_up_early is legal but no-op: state stays 
FINISHED.
+    task->_exec_state = PipelineTask::State::FINISHED;
+    EXPECT_TRUE(task->_state_transition(PipelineTask::State::RUNNABLE).ok());
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+
+    // FINALIZED→RUNNABLE under wake_up_early is legal but no-op: state stays 
FINALIZED.
+    task->_exec_state = PipelineTask::State::FINALIZED;
+    EXPECT_TRUE(task->_state_transition(PipelineTask::State::RUNNABLE).ok());
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+
+    // BLOCKED→FINISHED under wake_up_early does transition.
+    task->_exec_state = PipelineTask::State::BLOCKED;
+    EXPECT_TRUE(task->_state_transition(PipelineTask::State::FINISHED).ok());
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+    task->_wake_up_early = false;
+
+    // Test that wake_up() succeeds when the task has already finished 
(delayed wake_up race).
+    // _state_transition(RUNNABLE) is a no-op, and wake_up() should not 
re-submit the task.
+    {
+        task->_wake_up_early = true;
+        std::mutex mtx;
+        task->_exec_state = PipelineTask::State::FINISHED;
+        auto dep = std::make_shared<Dependency>(0, 0, "test_dep", true);
+        task->_blocked_dep = dep.get();
+        std::unique_lock<std::mutex> lc(mtx);
+        EXPECT_TRUE(task->wake_up(dep.get(), lc).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+    }
+    {
+        std::mutex mtx;
+        task->_exec_state = PipelineTask::State::FINALIZED;
+        auto dep = std::make_shared<Dependency>(0, 0, "test_dep", true);
+        task->_blocked_dep = dep.get();
+        std::unique_lock<std::mutex> lc(mtx);
+        EXPECT_TRUE(task->wake_up(dep.get(), lc).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+        task->_wake_up_early = false;
+    }
 }
 
 TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to