This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 491d628569e [minor](pipeline) Add lock restriction (#56822)
491d628569e is described below
commit 491d628569e1774731aea982fcb9e5e6715a68a2
Author: Gabriel <[email protected]>
AuthorDate: Sat Oct 11 15:53:09 2025 +0800
[minor](pipeline) Add lock restriction (#56822)
---
be/src/pipeline/dependency.cpp | 4 ++--
be/src/pipeline/pipeline_task.cpp | 2 +-
be/src/pipeline/pipeline_task.h | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 3062116e363..f3f69513cf0 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -85,7 +85,7 @@ void Dependency::set_ready() {
for (auto task : local_block_task) {
if (auto t = task.lock()) {
std::unique_lock<std::mutex> lc(_task_lock);
- THROW_IF_ERROR(t->wake_up(this));
+ THROW_IF_ERROR(t->wake_up(this, lc));
}
}
}
@@ -96,7 +96,7 @@ Dependency*
Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
if (!ready && task) {
_add_block_task(task);
start_watcher();
- THROW_IF_ERROR(task->blocked(this));
+ THROW_IF_ERROR(task->blocked(this, lc));
}
return ready ? nullptr : this;
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 6b1b5b2f9cb..d6e8c5bf2e1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -828,7 +828,7 @@ Status PipelineTask::revoke_memory(const
std::shared_ptr<SpillContext>& spill_co
return Status::OK();
}
-Status PipelineTask::wake_up(Dependency* dep) {
+Status PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /*
dep_lock */) {
// call by dependency
DCHECK_EQ(_blocked_dep, dep) << "dep : " << dep->debug_string(0) << "task:
" << debug_string();
_blocked_dep = nullptr;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 08b57e0c630..41019e4c598 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -120,7 +120,7 @@ public:
return _op_shared_states[id].get();
}
- Status wake_up(Dependency* dep);
+ Status wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock
*/);
DataSinkOperatorPtr sink() const { return _sink; }
@@ -169,7 +169,7 @@ public:
[[nodiscard]] size_t get_revocable_size() const;
[[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
- Status blocked(Dependency* dependency) {
+ Status blocked(Dependency* dependency, std::unique_lock<std::mutex>& /*
dep_lock */) {
DCHECK_EQ(_blocked_dep, nullptr) << "task: " << debug_string();
_blocked_dep = dependency;
return _state_transition(PipelineTask::State::BLOCKED);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]