This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a42536efc11 [feature](pipelineX) release dependency when task close
(#25633)
a42536efc11 is described below
commit a42536efc11782b29ab8a55226486c173cd7a2ab
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 19 19:42:50 2023 +0800
[feature](pipelineX) release dependency when task close (#25633)
---
be/src/pipeline/pipeline_task.h | 2 ++
be/src/pipeline/pipeline_x/pipeline_x_task.h | 5 +++++
be/src/pipeline/task_scheduler.cpp | 2 +-
3 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 417fab35b97..41f60c27653 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -184,6 +184,8 @@ public:
_previous_schedule_id = id;
}
+ virtual void release_dependency() {}
+
bool has_dependency();
OperatorPtr get_root() { return _root; }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 9fff7c15af2..fa977c68f63 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -129,6 +129,11 @@ public:
}
}
+ void release_dependency() override {
+ std::vector<DependencySPtr> {}.swap(_downstream_dependency);
+ DependencyMap {}.swap(_upstream_dependency);
+ }
+
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
_upstream_dependency.insert({id, {DependencySPtr {}}});
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index c58e4c5cf07..7b6d72e059d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -342,10 +342,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
}
- DCHECK(!task->is_pending_finish()) << task->debug_string();
}
task->set_state(state);
task->set_close_pipeline_time();
+ task->release_dependency();
task->fragment_context()->close_a_pipeline();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]