This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd4face000 [refactor](callonce) remove callonce usage in pipeline 
task to get full stack trace when core (#27331)
ffd4face000 is described below

commit ffd4face000e58af86d3695caaf697ffafa8d210
Author: yiguolei <[email protected]>
AuthorDate: Tue Nov 21 19:42:39 2023 +0800

    [refactor](callonce) remove callonce usage in pipeline task to get full 
stack trace when core (#27331)
    
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp              | 12 +++++++++---
 be/src/pipeline/pipeline_fragment_context.h                |  4 ++--
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp |  8 ++++++--
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h   |  2 +-
 be/src/pipeline/task_scheduler.cpp                         |  4 ++++
 5 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index be4fd6c5aea..7ccfe251a02 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -713,7 +713,7 @@ Status PipelineFragmentContext::submit() {
     if (!st.ok()) {
         std::lock_guard<std::mutex> l(_task_mutex);
         if (_closed_tasks == _total_tasks) {
-            std::call_once(_close_once_flag, [this] { _close_action(); });
+            _close_fragment_instance();
         }
         return Status::InternalError("Submit pipeline failed. err = {}, BE: 
{}", st.to_string(),
                                      BackendOptions::get_localhost());
@@ -851,7 +851,13 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
     return _root_pipeline->set_sink(sink_);
 }
 
-void PipelineFragmentContext::_close_action() {
+// If all pipeline tasks binded to the fragment instance are finished, then we 
could
+// close the fragment instance.
+void PipelineFragmentContext::_close_fragment_instance() {
+    if (_is_fragment_instance_closed) {
+        return;
+    }
+    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
     
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // all submitted tasks done
@@ -862,7 +868,7 @@ void PipelineFragmentContext::close_a_pipeline() {
     std::lock_guard<std::mutex> l(_task_mutex);
     ++_closed_tasks;
     if (_closed_tasks == _total_tasks) {
-        std::call_once(_close_once_flag, [this] { _close_action(); });
+        _close_fragment_instance();
     }
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index aa2f139c507..39e3dcbe169 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -153,7 +153,7 @@ protected:
     virtual Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request);
     template <bool is_intersect>
     Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
-    virtual void _close_action();
+    virtual void _close_fragment_instance();
     void _init_next_report_time();
     void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
 
@@ -205,7 +205,7 @@ protected:
     RuntimeProfile::Counter* _prepare_timer;
 
     std::function<void(RuntimeState*, Status*)> _call_back;
-    std::once_flag _close_once_flag;
+    bool _is_fragment_instance_closed = false;
 
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 28e1be496f4..b087c3bcab8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -939,7 +939,7 @@ Status PipelineXFragmentContext::submit() {
     if (!st.ok()) {
         std::lock_guard<std::mutex> l(_task_mutex);
         if (_closed_tasks == _total_tasks) {
-            std::call_once(_close_once_flag, [this] { _close_action(); });
+            _close_fragment_instance();
         }
         return Status::InternalError("Submit pipeline failed. err = {}, BE: 
{}", st.to_string(),
                                      BackendOptions::get_localhost());
@@ -969,7 +969,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
     }
 }
 
-void PipelineXFragmentContext::_close_action() {
+void PipelineXFragmentContext::_close_fragment_instance() {
+    if (_is_fragment_instance_closed) {
+        return;
+    }
+    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
     
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // all submitted tasks done
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 9e7ff42219f..23ff08fcb0a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -118,7 +118,7 @@ public:
     std::string debug_string() override;
 
 private:
-    void _close_action() override;
+    void _close_fragment_instance() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
     Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, 
PipelinePtr& cur_pipe,
                                const std::vector<TExpr>& texprs);
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e989af75b20..5470ba22591 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -390,6 +390,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
     task->set_close_pipeline_time();
     task->release_dependency();
     task->set_running(false);
+    // close_a_pipeline may delete fragment context and will core in some defer
+    // code, because the defer code will access fragment context it self.
+    std::shared_ptr<PipelineFragmentContext> lock_for_context =
+            task->fragment_context()->shared_from_this();
     task->fragment_context()->close_a_pipeline();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to