This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a42536efc11  [feature](pipelineX) release dependency when task close 
(#25633)
a42536efc11 is described below

commit a42536efc11782b29ab8a55226486c173cd7a2ab
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 19 19:42:50 2023 +0800

     [feature](pipelineX) release dependency when task close (#25633)
---
 be/src/pipeline/pipeline_task.h              | 2 ++
 be/src/pipeline/pipeline_x/pipeline_x_task.h | 5 +++++
 be/src/pipeline/task_scheduler.cpp           | 2 +-
 3 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 417fab35b97..41f60c27653 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -184,6 +184,8 @@ public:
         _previous_schedule_id = id;
     }
 
+    virtual void release_dependency() {}
+
     bool has_dependency();
 
     OperatorPtr get_root() { return _root; }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 9fff7c15af2..fa977c68f63 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -129,6 +129,11 @@ public:
         }
     }
 
+    void release_dependency() override {
+        std::vector<DependencySPtr> {}.swap(_downstream_dependency);
+        DependencyMap {}.swap(_upstream_dependency);
+    }
+
     std::vector<DependencySPtr>& get_upstream_dependency(int id) {
         if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
             _upstream_dependency.insert({id, {DependencySPtr {}}});
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index c58e4c5cf07..7b6d72e059d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -342,10 +342,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
                                           
Status::Cancelled(status.to_string()));
             state = PipelineTaskState::CANCELED;
         }
-        DCHECK(!task->is_pending_finish()) << task->debug_string();
     }
     task->set_state(state);
     task->set_close_pipeline_time();
+    task->release_dependency();
     task->fragment_context()->close_a_pipeline();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to