This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 491d628569e [minor](pipeline) Add lock restriction (#56822)
491d628569e is described below

commit 491d628569e1774731aea982fcb9e5e6715a68a2
Author: Gabriel <[email protected]>
AuthorDate: Sat Oct 11 15:53:09 2025 +0800

    [minor](pipeline) Add lock restriction (#56822)
---
 be/src/pipeline/dependency.cpp    | 4 ++--
 be/src/pipeline/pipeline_task.cpp | 2 +-
 be/src/pipeline/pipeline_task.h   | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 3062116e363..f3f69513cf0 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -85,7 +85,7 @@ void Dependency::set_ready() {
     for (auto task : local_block_task) {
         if (auto t = task.lock()) {
             std::unique_lock<std::mutex> lc(_task_lock);
-            THROW_IF_ERROR(t->wake_up(this));
+            THROW_IF_ERROR(t->wake_up(this, lc));
         }
     }
 }
@@ -96,7 +96,7 @@ Dependency* 
Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
     if (!ready && task) {
         _add_block_task(task);
         start_watcher();
-        THROW_IF_ERROR(task->blocked(this));
+        THROW_IF_ERROR(task->blocked(this, lc));
     }
     return ready ? nullptr : this;
 }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 6b1b5b2f9cb..d6e8c5bf2e1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -828,7 +828,7 @@ Status PipelineTask::revoke_memory(const 
std::shared_ptr<SpillContext>& spill_co
     return Status::OK();
 }
 
-Status PipelineTask::wake_up(Dependency* dep) {
+Status PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* 
dep_lock */) {
     // call by dependency
     DCHECK_EQ(_blocked_dep, dep) << "dep : " << dep->debug_string(0) << "task: 
" << debug_string();
     _blocked_dep = nullptr;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 08b57e0c630..41019e4c598 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -120,7 +120,7 @@ public:
         return _op_shared_states[id].get();
     }
 
-    Status wake_up(Dependency* dep);
+    Status wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock 
*/);
 
     DataSinkOperatorPtr sink() const { return _sink; }
 
@@ -169,7 +169,7 @@ public:
     [[nodiscard]] size_t get_revocable_size() const;
     [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
-    Status blocked(Dependency* dependency) {
+    Status blocked(Dependency* dependency, std::unique_lock<std::mutex>& /* 
dep_lock */) {
         DCHECK_EQ(_blocked_dep, nullptr) << "task: " << debug_string();
         _blocked_dep = dependency;
         return _state_transition(PipelineTask::State::BLOCKED);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to