This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ffd4face000 [refactor](callonce) remove callonce usage in pipeline
task to get full stack trace when core (#27331)
ffd4face000 is described below
commit ffd4face000e58af86d3695caaf697ffafa8d210
Author: yiguolei <[email protected]>
AuthorDate: Tue Nov 21 19:42:39 2023 +0800
[refactor](callonce) remove callonce usage in pipeline task to get full
stack trace when core (#27331)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 12 +++++++++---
be/src/pipeline/pipeline_fragment_context.h | 4 ++--
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++++--
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 4 ++++
5 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index be4fd6c5aea..7ccfe251a02 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -713,7 +713,7 @@ Status PipelineFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
- std::call_once(_close_once_flag, [this] { _close_action(); });
+ _close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE:
{}", st.to_string(),
BackendOptions::get_localhost());
@@ -851,7 +851,13 @@ Status PipelineFragmentContext::_create_sink(int
sender_id, const TDataSink& thr
return _root_pipeline->set_sink(sink_);
}
-void PipelineFragmentContext::_close_action() {
+// If all pipeline tasks binded to the fragment instance are finished, then we
could
+// close the fragment instance.
+void PipelineFragmentContext::_close_fragment_instance() {
+ if (_is_fragment_instance_closed) {
+ return;
+ }
+ Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done
@@ -862,7 +868,7 @@ void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
- std::call_once(_close_once_flag, [this] { _close_action(); });
+ _close_fragment_instance();
}
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index aa2f139c507..39e3dcbe169 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -153,7 +153,7 @@ protected:
virtual Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
- virtual void _close_action();
+ virtual void _close_fragment_instance();
void _init_next_report_time();
void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
@@ -205,7 +205,7 @@ protected:
RuntimeProfile::Counter* _prepare_timer;
std::function<void(RuntimeState*, Status*)> _call_back;
- std::once_flag _close_once_flag;
+ bool _is_fragment_instance_closed = false;
// If this is set to false, and '_is_report_success' is false as well,
// This executor will not report status to FE on being cancelled.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 28e1be496f4..b087c3bcab8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -939,7 +939,7 @@ Status PipelineXFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
- std::call_once(_close_once_flag, [this] { _close_action(); });
+ _close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE:
{}", st.to_string(),
BackendOptions::get_localhost());
@@ -969,7 +969,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
}
}
-void PipelineXFragmentContext::_close_action() {
+void PipelineXFragmentContext::_close_fragment_instance() {
+ if (_is_fragment_instance_closed) {
+ return;
+ }
+ Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 9e7ff42219f..23ff08fcb0a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -118,7 +118,7 @@ public:
std::string debug_string() override;
private:
- void _close_action() override;
+ void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index e989af75b20..5470ba22591 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -390,6 +390,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
task->set_close_pipeline_time();
task->release_dependency();
task->set_running(false);
+ // close_a_pipeline may delete fragment context and will core in some defer
+ // code, because the defer code will access fragment context it self.
+ std::shared_ptr<PipelineFragmentContext> lock_for_context =
+ task->fragment_context()->shared_from_this();
task->fragment_context()->close_a_pipeline();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]