This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d668e8d0b [DEBUG](Log) Add debug string for pipeline task cacnel 
(#20026)
2d668e8d0b is described below

commit 2d668e8d0ba69c921eebdbc9c6422a654240687e
Author: HappenLee <[email protected]>
AuthorDate: Thu May 25 09:58:31 2023 +0800

    [DEBUG](Log) Add debug string for pipeline task cacnel (#20026)
---
 be/src/pipeline/pipeline_task.cpp  | 23 ++++++++++++++++-------
 be/src/pipeline/pipeline_task.h    |  3 ++-
 be/src/pipeline/task_scheduler.cpp |  9 +++++++++
 be/src/runtime/fragment_mgr.cpp    |  8 ++++++++
 4 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index b5a0e5b602..41c4e6b549 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -42,6 +42,14 @@ class TaskGroup;
 
 namespace doris::pipeline {
 
+void PipelineTask::_fresh_profile_counter() {
+    COUNTER_SET(_wait_source_timer, 
(int64_t)_wait_source_watcher.elapsed_time());
+    COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
+    COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
+    COUNTER_SET(_wait_worker_timer, 
(int64_t)_wait_worker_watcher.elapsed_time());
+    COUNTER_SET(_wait_schedule_timer, 
(int64_t)_wait_schedule_watcher.elapsed_time());
+}
+
 void PipelineTask::_init_profile() {
     std::stringstream ss;
     ss << "PipelineTask"
@@ -255,12 +263,8 @@ Status PipelineTask::close() {
         }
     }
     if (_opened) {
-        COUNTER_UPDATE(_wait_source_timer, 
_wait_source_watcher.elapsed_time());
-        COUNTER_UPDATE(_schedule_counts, _schedule_time);
-        COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
-        COUNTER_UPDATE(_wait_worker_timer, 
_wait_worker_watcher.elapsed_time());
-        COUNTER_UPDATE(_wait_schedule_timer, 
_wait_schedule_watcher.elapsed_time());
-        COUNTER_UPDATE(_close_timer, close_ns);
+        _fresh_profile_counter();
+        COUNTER_SET(_close_timer, close_ns);
         COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
     }
     return s;
@@ -296,8 +300,13 @@ void PipelineTask::set_state(PipelineTaskState state) {
     _cur_state = state;
 }
 
-std::string PipelineTask::debug_string() const {
+std::string PipelineTask::debug_string() {
     fmt::memory_buffer debug_string_buffer;
+    std::stringstream profile_ss;
+    _fresh_profile_counter();
+    _task_profile->pretty_print(&profile_ss, "");
+
+    fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
     fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = 
{}]\noperators: ", _index,
                    get_state_name(_cur_state));
     for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index d17d13dda7..5039b36bb4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -192,7 +192,7 @@ public:
 
     OperatorPtr get_root() { return _root; }
 
-    std::string debug_string() const;
+    std::string debug_string();
 
     taskgroup::TaskGroup* get_task_group() const;
 
@@ -217,6 +217,7 @@ public:
 private:
     Status _open();
     void _init_profile();
+    void _fresh_profile_counter();
 
     uint32_t _index;
     PipelinePtr _pipeline;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 53e9a4d868..ef4ad829c5 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,6 +118,15 @@ void BlockedTaskScheduler::_schedule() {
                                    PipelineTaskState::PENDING_FINISH);
                 }
             } else if (task->fragment_context()->is_canceled()) {
+                std::string task_ds;
+#ifndef NDEBUG
+                task_ds = task->debug_string();
+#endif
+                LOG(WARNING) << "Canceled, query_id=" << 
print_id(task->query_context()->query_id)
+                             << ", instance_id="
+                             << 
print_id(task->fragment_context()->get_fragment_instance_id())
+                             << (task_ds.empty() ? "" : task_ds);
+
                 if (task->is_pending_finish()) {
                     task->set_state(PipelineTaskState::PENDING_FINISH);
                     iter++;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6912e58275..c1413b0145 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -900,6 +900,8 @@ void FragmentMgr::_set_scan_concurrency(const Param& 
params, QueryContext* query
 
 void FragmentMgr::cancel(const TUniqueId& fragment_id, const 
PPlanFragmentCancelReason& reason,
                          const std::string& msg) {
+    bool find_the_fragment = false;
+
     std::shared_ptr<FragmentExecState> exec_state;
     {
         std::lock_guard<std::mutex> lock(_lock);
@@ -909,6 +911,7 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, 
const PPlanFragmentCancel
         }
     }
     if (exec_state) {
+        find_the_fragment = true;
         exec_state->cancel(reason, msg);
     }
 
@@ -921,8 +924,13 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, 
const PPlanFragmentCancel
         }
     }
     if (pipeline_fragment_ctx) {
+        find_the_fragment = true;
         pipeline_fragment_ctx->cancel(reason, msg);
     }
+
+    if (!find_the_fragment) {
+        LOG(WARNING) << "Do not find the fragment instance id:" << fragment_id 
<< " to cancel";
+    }
 }
 
 void FragmentMgr::cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to