This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fed20f06899 [fix](be) Prevent finalized pipeline task resubmission 
(#62891)
fed20f06899 is described below

commit fed20f06899663d9a6e1812efa1f5b1b1716e246
Author: Pxl <[email protected]>
AuthorDate: Wed May 6 11:55:48 2026 +0800

    [fix](be) Prevent finalized pipeline task resubmission (#62891)
    
    Problem Summary: `Pipeline::make_all_runnable()` wakes upstream/related
    pipeline tasks early by calling `set_wake_up_early()` and then
    `unblock_all_dependencies()`. `unblock_all_dependencies()` marks every
    dependency ready/always-ready, and `Dependency::set_ready()`
    synchronously calls back into `PipelineTask::wake_up()` for blocked
    tasks. That wake-up path may submit the task again, and
    `HybridTaskScheduler::submit()` immediately inspects the task via
    `is_blockable()`.
    
    At the same time, once a task returns `done=true`,
    `TaskScheduler::task_running_defer` calls `close_task()`, which runs
    `close()` and `finalize()`. `finalize()` releases task-owned resources
    such as `_sink`, `_operators`, shared states, and `_block`.
    
    Before this fix, dependency pointer lifetime and forced-unblock/finalize
    ordering were protected by separate synchronization concepts. That made
    the ownership boundary unclear: forced unblocking needs the dependency
    containers and raw `Dependency*` pointers to remain valid, and it also
    needs task-owned operator/shared state to remain stable while
    `set_ready()` may synchronously trigger `wake_up()->submit()`. If
    close/finalize releases task resources while forced unblocking is still
    firing wake-up callbacks, the delayed wake-up can submit or inspect a
    task whose `_sink`/`_operators` have already been released, leading to
    crashes or invalid task state/resource access.
    
    This PR uses one clearly named `_dependency_lifecycle_lock` for the
    task/dependency lifetime boundary:
    
    - It protects the dependency containers and the raw `Dependency*`
    pointers stored in them.
    - `unblock_all_dependencies()` holds it while marking dependencies
    ready. Since `Dependency::set_ready()` invokes `PipelineTask::wake_up()`
    synchronously, wake-ups caused by forced unblocking complete before the
    lock is released.
    - `close()` takes the same lock only around the short `FINISHED` state
    publication, so delayed wake-ups see a stable terminal transition.
    - `finalize()` takes the same lock while publishing `FINALIZED` and
    releasing task-owned operator/shared state.
    - `wake_up()` itself does not take this lock, avoiding re-entrant
    deadlock when `set_ready()` calls back into `wake_up()` while
    `unblock_all_dependencies()` already holds the lock.
    
    This lock intentionally only serializes forced unblocking with
    close/finalize. A normal dependency can still call
    `Dependency::set_ready()` from another thread and invoke `wake_up()`
    without holding `_dependency_lifecycle_lock`, so the state transition
    itself must also be race-safe. This PR therefore keeps
    `_state_transition()` as a CAS/retry loop: each transition validates the
    observed state and publishes the new state with
    `compare_exchange_strong`. If a delayed `wake_up()` observes `BLOCKED`
    but `close()/finalize()` publishes `FINISHED/FINALIZED` before the CAS,
    the CAS fails, the wake-up retries, sees the terminal state, and treats
    `FINISHED/FINALIZED -> RUNNABLE` as an atomic no-op. That prevents a
    stale wake-up from overwriting terminal state back to `RUNNABLE` and
    resubmitting a task whose resources were already released.
    
    This does not introduce a broad scheduling lock. The long/meaningful
    critical section is the same forced-unblock path that already needed
    dependency lifetime protection; `close()` only adds a short
    state-transition critical section, and `finalize()` already had to wait
    before clearing the same state. The combined design makes the intended
    lifecycle ordering explicit: forced unblocking keeps task resources
    stable while firing synchronous wake-up callbacks, and the CAS state
    machine protects delayed wake-ups from racing with terminal transitions
    outside that forced-unblock path.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Unit Test
        - `./run-be-ut.sh --run --filter=PipelineTaskTest.*`
        - `build-support/check-format.sh`
    - `build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN` (failed
    due to pre-existing `pipeline_task.cpp` `execute()` complexity warnings,
    pre-existing `pipeline_task_test.cpp` `TestBody` complexity warnings,
    and `jni-util.h` static_assert errors)
    - Behavior changed: No
    - Does this need documentation: No
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 be/src/exec/pipeline/pipeline_task.cpp       |  35 ++++++---
 be/src/exec/pipeline/pipeline_task.h         |   6 +-
 be/test/exec/pipeline/pipeline_task_test.cpp | 105 +++++++++++++++++++++++++++
 3 files changed, 135 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_task.cpp 
b/be/src/exec/pipeline/pipeline_task.cpp
index b98bdae81de..fe75d7b499d 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -154,7 +154,7 @@ Status PipelineTask::prepare(const 
std::vector<TScanRangeParams>& scan_range, co
     {
         const auto& deps =
                 
_state->get_local_state(_source->operator_id())->execution_dependencies();
-        std::unique_lock<std::mutex> lc(_dependency_lock);
+        std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
         std::copy(deps.begin(), deps.end(),
                   std::inserter(_execution_dependencies, 
_execution_dependencies.end()));
     }
@@ -200,7 +200,7 @@ Status PipelineTask::_extract_dependencies() {
         }
     }
     {
-        std::unique_lock<std::mutex> lc(_dependency_lock);
+        std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
         read_dependencies.swap(_read_dependencies);
         write_dependencies.swap(_write_dependencies);
         finish_dependencies.swap(_finish_dependencies);
@@ -345,12 +345,18 @@ bool PipelineTask::_is_blocked() {
 }
 
 void PipelineTask::unblock_all_dependencies() {
-    // We use a lock to assure all dependencies are not deconstructed here.
-    std::unique_lock<std::mutex> lc(_dependency_lock);
+    // Keep dependency pointers and task-owned operator/shared state stable 
because set_ready() may
+    // synchronously call wake_up() and submit this task.
+    std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
     auto fragment = _fragment_context.lock();
     if (!is_finalized() && fragment) {
         try {
             DCHECK(_wake_up_early || fragment->is_canceled());
+            
DBUG_EXECUTE_IF("PipelineTask::unblock_all_dependencies.before_set_ready", {
+                if (dp->callback.has_value()) {
+                    DBUG_RUN_CALLBACK();
+                }
+            });
             std::ranges::for_each(_write_dependencies,
                                   [&](Dependency* dep) { 
dep->set_always_ready(); });
             std::ranges::for_each(_finish_dependencies,
@@ -882,8 +888,9 @@ Status PipelineTask::finalize() {
         return Status::OK();
     }
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
+    // Synchronize with unblock_all_dependencies() before clearing state used 
by wake_up()->submit().
+    std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
     RETURN_IF_ERROR(_state_transition(State::FINALIZED));
-    std::unique_lock<std::mutex> lc(_dependency_lock);
     _sink_shared_state.reset();
     _op_shared_states.clear();
     _shared_state_map.clear();
@@ -920,6 +927,8 @@ Status PipelineTask::close(Status exec_status, bool 
close_sink) {
     }
 
     if (close_sink) {
+        // Synchronize FINISHED with forced unblocking so delayed wake_up() 
sees a stable state.
+        std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
         RETURN_IF_ERROR(_state_transition(State::FINISHED));
     }
     return s;
@@ -939,7 +948,7 @@ std::string PipelineTask::debug_string() {
                    _index, _opened, _eos, _to_string(_exec_state), _dry_run, 
_wake_up_early.load(),
                    _wake_by, _state_change_watcher.elapsed_time() / 
NANOS_PER_SEC, _spilling,
                    is_running());
-    std::unique_lock<std::mutex> lc(_dependency_lock);
+    std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
     auto* cur_blocked_dep = _blocked_dep;
     auto fragment = _fragment_context.lock();
     if (is_finalized() || !fragment) {
@@ -1076,12 +1085,18 @@ void PipelineTask::wake_up(Dependency* dep, 
std::unique_lock<std::mutex>& /* dep
 Status PipelineTask::_state_transition(State new_state) {
     const auto& table =
             _wake_up_early ? WAKE_UP_EARLY_LEGAL_STATE_TRANSITION : 
LEGAL_STATE_TRANSITION;
-    if (!table[(int)new_state].contains(_exec_state)) {
+    auto current_state = _exec_state.load();
+    if (!table[(int)new_state].contains(current_state)) {
         return Status::InternalError(
-                "Task state transition from {} to {} is not allowed! Task 
info: {}",
-                _to_string(_exec_state), _to_string(new_state), 
debug_string());
+                "Task state transition from {} to {} is not allowed! Task: 
query_id={}, "
+                "instance_id={}, id={}, pipeline={}, open={}, eos={}, 
dry_run={}, "
+                "wake_up_early={}, wake_by={}, spilling={}, running={}",
+                _to_string(current_state), _to_string(new_state), 
print_id(_query_id),
+                print_id(_state->fragment_instance_id()), _index, 
_pipeline_name, _opened,
+                _eos.load(), _dry_run, _wake_up_early.load(), _wake_by.load(), 
_spilling.load(),
+                is_running());
     }
-    // FINISHED/FINALIZED → RUNNABLE is legal under wake_up_early (delayed 
wake_up() arriving
+    // FINISHED/FINALIZED -> RUNNABLE is legal under wake_up_early (delayed 
wake_up() arriving
     // after the task already terminated), but we must not actually move the 
state backwards
     // or update profile info (which would misleadingly show RUNNABLE for a 
terminated task).
     bool need_move = !((_exec_state == State::FINISHED || _exec_state == 
State::FINALIZED) &&
diff --git a/be/src/exec/pipeline/pipeline_task.h 
b/be/src/exec/pipeline/pipeline_task.h
index bb1440bfcb7..2268c00a4c7 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -271,7 +271,11 @@ private:
     Dependency* _blocked_dep = nullptr;
 
     Dependency* _memory_sufficient_dependency;
-    std::mutex _dependency_lock;
+    // Protects dependency containers and the raw Dependency pointers they 
contain. It also
+    // serializes forced dependency unblocking with close()/finalize(): 
set_ready() may synchronously
+    // call wake_up() and submit this task, so close()/finalize() must not 
clear operator/shared
+    // state until forced unblocking finishes. wake_up() must not take this 
lock.
+    std::mutex _dependency_lifecycle_lock;
 
     std::atomic<bool> _running {false};
     std::atomic<bool> _eos {false};
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 34d007be4c2..d6294825288 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,11 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include <chrono>
+#include <functional>
+#include <future>
+#include <thread>
+
 #include "common/config.h"
 #include "common/status.h"
 #include "exec/operator/operator.h"
@@ -34,6 +39,7 @@
 #include "testutil/mock/mock_thread_mem_tracker_mgr.h"
 #include "testutil/mock/mock_workload_group_mgr.h"
 #include "util/debug_points.h"
+#include "util/defer_op.h"
 
 namespace doris {
 
@@ -94,6 +100,19 @@ private:
 template class OperatorX<DummyOperatorLocalState>;
 template class DataSinkOperatorX<DummySinkLocalState>;
 
+class BlockableSubmitTaskScheduler : public MockTaskScheduler {
+public:
+    Status submit(PipelineTaskSPtr task) override {
+        if (on_submit) {
+            on_submit(task);
+        }
+        static_cast<void>(task->is_blockable());
+        return MockTaskScheduler::submit(task);
+    }
+
+    std::function<void(PipelineTaskSPtr)> on_submit;
+};
+
 TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
     auto num_instances = 1;
     auto pip_id = 0;
@@ -557,6 +576,92 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
     }
 }
 
+TEST_F(PipelineTaskTest, TEST_WAKE_UP_SUBMIT_PROTECTED_FROM_FINALIZE) {
+    auto scheduler = std::make_unique<BlockableSubmitTaskScheduler>();
+    auto* scheduler_ptr = scheduler.get();
+    _query_ctx->_task_scheduler = scheduler_ptr;
+
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    OperatorPtr source_op;
+    source_op.reset(new DummyOperator());
+    EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+    int op_id = 1;
+    int node_id = 2;
+    int dest_id = 3;
+    DataSinkOperatorPtr sink_op;
+    sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+    EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+
+    std::vector<TScanRangeParams> scan_range;
+    int sender_id = 0;
+    TDataSink tsink;
+    EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    task->_wake_up_early = true;
+    auto dep = std::make_shared<Dependency>(0, 0, "test_dep", false);
+    task->_execution_dependencies.push_back(dep.get());
+    EXPECT_EQ(dep->is_blocked_by(task), dep.get());
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+
+    auto origin_enable_debug_points = config::enable_debug_points;
+    config::enable_debug_points = true;
+    Defer debug_point_cleanup {[&]() {
+        
DebugPoints::instance()->remove("PipelineTask::unblock_all_dependencies.before_set_ready");
+        config::enable_debug_points = origin_enable_debug_points;
+        _query_ctx->_task_scheduler = _task_scheduler.get();
+    }};
+
+    std::promise<void> wake_up_reached_promise;
+    auto wake_up_reached = wake_up_reached_promise.get_future();
+    std::promise<void> release_wake_up_promise;
+    auto release_wake_up = release_wake_up_promise.get_future();
+    DebugPoints::instance()->add_with_callback(
+            "PipelineTask::unblock_all_dependencies.before_set_ready", 
std::function<void()>([&]() {
+                wake_up_reached_promise.set_value();
+                release_wake_up.wait();
+            }));
+
+    scheduler_ptr->on_submit = [&](PipelineTaskSPtr submitted_task) {
+        EXPECT_EQ(submitted_task.get(), task.get());
+        EXPECT_NE(task->_sink, nullptr);
+        EXPECT_FALSE(task->_operators.empty());
+    };
+
+    std::thread unblock_thread([&]() { task->unblock_all_dependencies(); });
+    EXPECT_EQ(wake_up_reached.wait_for(std::chrono::seconds(10)), 
std::future_status::ready);
+
+    std::promise<void> close_started_promise;
+    auto close_started = close_started_promise.get_future();
+    auto close_finalize = std::async(std::launch::async, [&]() {
+        close_started_promise.set_value();
+        EXPECT_TRUE(task->close(Status::OK()).ok());
+        EXPECT_TRUE(task->finalize().ok());
+    });
+    EXPECT_EQ(close_started.wait_for(std::chrono::seconds(10)), 
std::future_status::ready);
+    EXPECT_EQ(close_finalize.wait_for(std::chrono::milliseconds(100)), 
std::future_status::timeout);
+
+    release_wake_up_promise.set_value();
+    unblock_thread.join();
+    close_finalize.wait();
+
+    EXPECT_EQ(scheduler_ptr->submit_count(), 1);
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+    EXPECT_EQ(task->_sink, nullptr);
+    EXPECT_TRUE(task->_operators.empty());
+}
+
 TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
     auto num_instances = 1;
     auto pip_id = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to