This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d6c64d305f0 [chore](log) Add log to trace query execution #25739
d6c64d305f0 is described below

commit d6c64d305f0e906b2fd044db8469b6551716d509
Author: zhiqiang <[email protected]>
AuthorDate: Thu Oct 26 01:09:25 2023 -0500

    [chore](log) Add log to trace query execution #25739
---
 be/src/agent/task_worker_pool.cpp                  |  3 --
 be/src/exec/exec_node.cpp                          |  7 +--
 be/src/pipeline/pipeline_fragment_context.cpp      | 24 +++++-----
 be/src/pipeline/pipeline_fragment_context.h        |  2 +-
 be/src/pipeline/task_scheduler.cpp                 | 15 +++++-
 be/src/runtime/fragment_mgr.cpp                    | 25 ++++++----
 be/src/runtime/plan_fragment_executor.cpp          | 18 +++----
 be/src/runtime/plan_fragment_executor.h            |  2 +
 be/src/runtime/query_context.h                     |  8 +++-
 be/src/runtime/runtime_state.h                     |  2 +-
 be/src/util/debug_util.cpp                         |  4 +-
 be/src/util/debug_util.h                           |  2 +-
 be/src/vec/runtime/vdata_stream_mgr.cpp            | 13 ++---
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  7 +--
 .../doris/common/profile/ExecutionProfile.java     | 19 +++++++-
 .../org/apache/doris/nereids/txn/Transaction.java  |  2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 55 +++++++++++++++++-----
 .../java/org/apache/doris/qe/QeProcessorImpl.java  | 16 ++++---
 .../java/org/apache/doris/qe/SimpleScheduler.java  |  4 --
 .../java/org/apache/doris/qe/StmtExecutor.java     | 11 ++++-
 20 files changed, 157 insertions(+), 82 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index ff22f7923c3..6717af3ce4b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1071,9 +1071,6 @@ void TaskWorkerPool::_handle_report(const TReportRequest& 
request, ReportType ty
                 .error(result.status);
     } else {
         is_report_success = true;
-        LOG_INFO("successfully report {}", TYPE_STRING(type))
-                .tag("host", _master_info.network_address.hostname)
-                .tag("port", _master_info.network_address.port);
     }
     switch (type) {
     case TASK:
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ed8b11457a6..16964264edf 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -208,12 +208,13 @@ void ExecNode::release_resource(doris::RuntimeState* 
state) {
 
 Status ExecNode::close(RuntimeState* state) {
     if (_is_closed) {
-        LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+        LOG(INFO) << "query= " << print_id(state->query_id())
+                  << " fragment_instance_id=" << 
print_id(state->fragment_instance_id())
                   << " already closed";
         return Status::OK();
     }
-    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << ", "
-              << " id=" << _id << " type=" << print_plan_node_type(_type) << " 
closed";
+    LOG(INFO) << "query= " << print_id(state->query_id())
+              << " fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed";
     _is_closed = true;
 
     Status result;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 709e4df7652..686ff0fe0ad 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -166,8 +166,7 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
         if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
             LOG(WARNING) << "PipelineFragmentContext "
-                         << PrintInstanceStandardInfo(_query_id, _fragment_id,
-                                                      _fragment_instance_id)
+                         << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
                          << " is canceled, cancel message: " << msg;
 
         } else {
@@ -217,12 +216,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
         tracer = telemetry::get_tracer(print_id(_query_id));
     }
 
-    LOG_INFO("PipelineFragmentContext::prepare")
-            .tag("query_id", print_id(_query_id))
-            .tag("fragment_id", _fragment_id)
-            .tag("instance_id", print_id(local_params.fragment_instance_id))
-            .tag("backend_num", local_params.backend_num)
-            .tag("pthread_id", (uintptr_t)pthread_self());
+    LOG_INFO("Preparing instance {}, backend_num {}",
+             PrintInstanceStandardInfo(_query_id, 
local_params.fragment_instance_id),
+             local_params.backend_num);
 
     // 1. init _runtime_state
     _runtime_state = RuntimeState::create_unique(local_params, 
request.query_id,
@@ -284,8 +280,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     std::vector<ExecNode*> scan_nodes;
     std::vector<TScanRangeParams> no_scan_ranges;
     _root_plan->collect_scan_nodes(&scan_nodes);
-    VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size();
-    VLOG_CRITICAL << "params.per_node_scan_ranges.size()="
+    VLOG_CRITICAL << "query " << print_id(get_query_id())
+                  << " scan_nodes.size()=" << scan_nodes.size();
+    VLOG_CRITICAL << "query " << print_id(get_query_id()) << " 
params.per_node_scan_ranges.size()="
                   << local_params.per_node_scan_ranges.size();
 
     // set scan range in ScanNode
@@ -310,7 +307,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
                                                  no_scan_ranges);
             static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
-            VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
+            VLOG_CRITICAL << "query " << print_id(get_query_id())
+                          << "scan_node_Id=" << scan_node->id()
                           << " size=" << scan_ranges.get().size();
         }
     }
@@ -750,7 +748,9 @@ void PipelineFragmentContext::close_if_prepare_failed() {
     }
     for (auto& task : _tasks) {
         DCHECK(!task->is_pending_finish());
-        WARN_IF_ERROR(task->close(Status::OK()), "close_if_prepare_failed 
failed: ");
+        std::stringstream msg;
+        msg << "query " << print_id(_query_id) << " closed since prepare 
failed";
+        WARN_IF_ERROR(task->close(Status::OK()), msg.str());
         close_a_pipeline();
     }
 }
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index ffbfaef987a..85834e513f9 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,7 +71,7 @@ public:
 
     PipelinePtr add_pipeline();
 
-    TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
+    TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
 
     virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) 
{
         return _runtime_state.get();
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index ee6f5cdd829..4ef5def6f1d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -31,6 +31,7 @@
 #include <string>
 #include <thread>
 
+#include "common/logging.h"
 #include "common/signal_handler.h"
 #include "pipeline/pipeline_task.h"
 #include "pipeline/task_queue.h"
@@ -111,9 +112,9 @@ void BlockedTaskScheduler::_schedule() {
             if (state == PipelineTaskState::PENDING_FINISH) {
                 // should cancel or should finish
                 if (task->is_pending_finish()) {
-                    VLOG_DEBUG << "Task pending" << task->debug_string();
                     iter++;
                 } else {
+                    VLOG_DEBUG << "Task pending" << task->debug_string();
                     _make_task_run(local_blocked_tasks, iter, 
PipelineTaskState::PENDING_FINISH);
                 }
             } else if (task->query_context()->is_cancelled()) {
@@ -272,7 +273,6 @@ void TaskScheduler::_do_work(size_t index) {
             LOG(WARNING) << fmt::format(
                     "Pipeline task failed. query_id: {} reason: {}",
                     
PrintInstanceStandardInfo(task->query_context()->query_id(),
-                                              
task->fragment_context()->get_fragment_id(),
                                               
task->fragment_context()->get_fragment_instance_id()),
                     status.msg());
             // Print detail informations below when you debugging here.
@@ -296,11 +296,22 @@ void TaskScheduler::_do_work(size_t index) {
                 fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                      "finalize fail:" + status.msg());
             } else {
+                VLOG_DEBUG << fmt::format(
+                        "Try close task: {}, fragment_ctx->is_canceled(): {}",
+                        PrintInstanceStandardInfo(
+                                task->query_context()->query_id(),
+                                
task->fragment_context()->get_fragment_instance_id()),
+                        fragment_ctx->is_canceled());
                 _try_close_task(task,
                                 fragment_ctx->is_canceled() ? 
PipelineTaskState::CANCELED
                                                             : 
PipelineTaskState::FINISHED,
                                 status);
             }
+            VLOG_DEBUG << fmt::format(
+                    "Task {} is eos, status {}.",
+                    
PrintInstanceStandardInfo(task->query_context()->query_id(),
+                                              
task->fragment_context()->get_fragment_instance_id()),
+                    get_state_name(task->get_state()));
             continue;
         }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1fb68049e6e..83b57c28fd7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -415,6 +415,8 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     }
 
     if (!rpc_status.ok()) {
+        LOG_INFO("Going to cancel instance {} since report exec status got rpc 
failed: {}",
+                 print_id(req.fragment_instance_id), rpc_status.to_string());
         // we need to cancel the execution of this fragment
         static_cast<void>(req.update_fn(rpc_status));
         req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, 
rpc_status.msg());
@@ -425,15 +427,13 @@ static void empty_function(RuntimeState*, Status*) {}
 
 void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> 
fragment_executor,
                                const FinishCallback& cb) {
-    std::string func_name {"PlanFragmentExecutor::_exec_actual"};
 #ifndef BE_TEST
     SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
 #endif
 
-    LOG_INFO(func_name)
-            .tag("query_id", fragment_executor->query_id())
-            .tag("instance_id", fragment_executor->fragment_instance_id())
-            .tag("pthread_id", (uintptr_t)pthread_self());
+    LOG_INFO("Instance {} executing",
+             PrintInstanceStandardInfo(fragment_executor->query_id(),
+                                       
fragment_executor->fragment_instance_id()));
 
     Status st = fragment_executor->execute();
     if (!st.ok()) {
@@ -452,8 +452,13 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
     {
         std::lock_guard<std::mutex> lock(_lock);
         
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
+
+        LOG_INFO("Instance {} finished",
+                 PrintInstanceStandardInfo(fragment_executor->query_id(),
+                                           
fragment_executor->fragment_instance_id()));
         if (all_done && query_ctx) {
             _query_ctx_map.erase(query_ctx->query_id());
+            LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
         }
     }
 
@@ -560,11 +565,12 @@ void FragmentMgr::remove_pipeline_context(
     f_context->instance_ids(ins_ids);
     bool all_done = q_context->countdown(ins_ids.size());
     for (const auto& ins_id : ins_ids) {
-        VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ", 
all_done:" << all_done;
+        LOG_INFO("Removing query {} instance {}, all done? {}", 
print_id(query_id),
+                 print_id(ins_id), all_done);
         _pipeline_map.erase(ins_id);
     }
     if (all_done) {
-        LOG(INFO) << "remove query context " << print_id(query_id);
+        LOG_INFO("Query {} finished", print_id(query_id));
         _query_ctx_map.erase(query_id);
     }
 }
@@ -800,11 +806,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
     cur_span->SetAttribute("query_id", print_id(params.query_id));
 
-    VLOG_ROW << "exec_plan_fragment params is "
+    VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
              << apache::thrift::ThriftDebugString(params).c_str();
     // sometimes TExecPlanFragmentParams debug string is too long and glog
     // will truncate the log line, so print query options seperately for 
debuggin purpose
-    VLOG_ROW << "query options is "
+    VLOG_ROW << "query: " << print_id(params.query_id) << "query options is "
              << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
 
     std::shared_ptr<QueryContext> query_ctx;
@@ -1075,6 +1081,7 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
+                    LOG_INFO("Query {} is timeout", print_id(it->first));
                     it = _query_ctx_map.erase(it);
                 } else {
                     ++it;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index e96b086ad44..78d48327abf 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -56,6 +56,7 @@
 #include "runtime/stream_load/stream_load_context.h"
 #include "runtime/thread_context.h"
 #include "util/container_util.hpp"
+#include "util/debug_util.h"
 #include "util/defer_op.h"
 #include "util/pretty_printer.h"
 #include "util/telemetry/telemetry.h"
@@ -264,10 +265,9 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
 
 Status PlanFragmentExecutor::open() {
     int64_t mem_limit = _runtime_state->query_mem_tracker()->limit();
-    LOG_INFO("PlanFragmentExecutor::open")
-            .tag("query_id", _query_ctx->query_id())
-            .tag("instance_id", _runtime_state->fragment_instance_id())
-            .tag("mem_limit", PrettyPrinter::print(mem_limit, TUnit::BYTES));
+    LOG_INFO("PlanFragmentExecutor::open {}, mem_limit {}",
+             PrintInstanceStandardInfo(_query_ctx->query_id(), 
_fragment_instance_id),
+             PrettyPrinter::print(mem_limit, TUnit::BYTES));
 
     // we need to start the profile-reporting thread before calling Open(), 
since it
     // may block
@@ -592,11 +592,9 @@ void PlanFragmentExecutor::stop_report_thread() {
 
 void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
     std::lock_guard<std::mutex> l(_status_lock);
-    LOG_INFO("PlanFragmentExecutor::cancel")
-            .tag("query_id", print_id(_query_ctx->query_id()))
-            .tag("instance_id", 
print_id(_runtime_state->fragment_instance_id()))
-            .tag("reason", reason)
-            .tag("error message", msg);
+    LOG_INFO("PlanFragmentExecutor::cancel {} reason {} error msg {}",
+             PrintInstanceStandardInfo(query_id(), fragment_instance_id()), 
reason, msg);
+
     // NOTE: Not need to check if already cancelled.
     // Bug scenario: test_array_map_function.groovy:
     //      select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ 
array_map((x,y)->x+y, c_array1, c_array2) from test.array_test2 where id > 10 
order by id
@@ -680,8 +678,6 @@ void PlanFragmentExecutor::close() {
             }
             LOG(INFO) << ss.str();
         }
-        LOG(INFO) << "Close() fragment_instance_id="
-                  << print_id(_runtime_state->fragment_instance_id());
     }
 
     profile()->add_to_span(_span);
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index bb39596ff10..5426626da32 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -146,6 +146,8 @@ public:
 
     TUniqueId query_id() const { return _query_ctx->query_id(); }
 
+    int fragment_id() const { return _fragment_id; }
+
     bool is_timeout(const VecDateTimeValue& now) const;
 
     bool is_canceled() { return _runtime_state->is_cancelled(); }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 4ec3517740d..c51e92d14c8 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -22,6 +22,7 @@
 
 #include <atomic>
 #include <memory>
+#include <sstream>
 #include <string>
 
 #include "common/config.h"
@@ -81,9 +82,10 @@ public:
         // it is found that query already exists in _query_ctx_map, and query 
mem tracker is not used.
         // query mem tracker consumption is not equal to 0 after use, because 
there is memory consumed
         // on query mem tracker, released on other trackers.
+        std::string mem_tracker_msg {""};
         if (query_mem_tracker->peak_consumption() != 0) {
-            LOG(INFO) << fmt::format(
-                    "Deregister query/load memory tracker, queryId={}, 
Limit={}, CurrUsed={}, "
+            mem_tracker_msg = fmt::format(
+                    ", deregister query/load memory tracker, queryId={}, 
Limit={}, CurrUsed={}, "
                     "PeakUsed={}",
                     print_id(_query_id), 
MemTracker::print_bytes(query_mem_tracker->limit()),
                     MemTracker::print_bytes(query_mem_tracker->consumption()),
@@ -92,6 +94,8 @@ public:
         if (_task_group) {
             _task_group->remove_mem_tracker_limiter(query_mem_tracker);
         }
+
+        LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), 
mem_tracker_msg);
     }
 
     // Notice. For load fragments, the fragment_num sent by FE has a small 
probability of 0.
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 65256b98dd4..29ef581947c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -181,7 +181,7 @@ public:
         // Create a error status, so that we could print error stack, and
         // we could know which path call cancel.
         LOG(WARNING) << "Task is cancelled, instance: "
-                     << PrintInstanceStandardInfo(_query_id, _fragment_id, 
_fragment_instance_id)
+                     << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
                      << " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
     }
 
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 2d44c281a5d..68ea21f02dd 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -128,9 +128,9 @@ std::string PrintFrontendInfo(const TFrontendInfo& fe_info) 
{
     return ss.str();
 }
 
-std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, 
const TUniqueId& iid) {
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId& 
iid) {
     std::stringstream ss;
-    ss << print_id(iid) << '|' << fid << '|' << print_id(qid);
+    ss << print_id(iid) << '|' << print_id(qid);
     return ss.str();
 }
 
diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h
index e6b6491b8a9..31cc1f8f5ca 100644
--- a/be/src/util/debug_util.h
+++ b/be/src/util/debug_util.h
@@ -38,7 +38,7 @@ std::string PrintFrontendInfos(const 
std::vector<TFrontendInfo>& fe_infos);
 // A desirable scenario would be to call this function WHENEVER whenever we 
need to print instance information.
 // By using a fixed format, we would be able to identify all the paths in 
which this instance is executed.
 // InstanceId|FragmentIdx|QueryId
-std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, 
const TUniqueId& iid);
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId& 
iid);
 
 // Returns a string "<product version number> (build <build hash>)"
 // If compact == false, this string is appended: "\nBuilt on <build time>"
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 5c87059c3dd..19db8ca15b1 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -60,7 +60,7 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
         PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging,
         std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) 
{
     DCHECK(profile != nullptr);
-    VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
+    VLOG_FILE << "creating receiver for fragment=" << 
print_id(fragment_instance_id)
               << ", node=" << dest_node_id;
     std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
             this, state, row_desc, fragment_instance_id, dest_node_id, 
num_senders, is_merging,
@@ -75,7 +75,8 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId& 
fragment_instance_id,
                                                              PlanNodeId 
node_id,
                                                              bool 
acquire_lock) {
-    VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id << 
", node=" << node_id;
+    VLOG_ROW << "looking up fragment_instance_id=" << 
print_id(fragment_instance_id)
+             << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
     // Create lock guard and not own lock currently and will lock conditionally
     std::unique_lock recvr_lock(_lock, std::defer_lock);
@@ -141,7 +142,7 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
 
 Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id) {
     std::shared_ptr<VDataStreamRecvr> targert_recvr;
-    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << 
fragment_instance_id
+    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << 
print_id(fragment_instance_id)
                << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
     {
@@ -168,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
         return Status::OK();
     } else {
         std::stringstream err;
-        err << "unknown row receiver id: fragment_instance_id=" << 
fragment_instance_id
+        err << "unknown row receiver id: fragment_instance_id=" << 
print_id(fragment_instance_id)
             << " node_id=" << node_id;
         LOG(ERROR) << err.str();
         return Status::InternalError(err.str());
@@ -176,7 +177,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
 }
 
 void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status 
exec_status) {
-    VLOG_QUERY << "cancelling all streams for fragment=" << 
fragment_instance_id;
+    VLOG_QUERY << "cancelling all streams for fragment=" << 
print_id(fragment_instance_id);
     std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -187,7 +188,7 @@ void VDataStreamMgr::cancel(const TUniqueId& 
fragment_instance_id, Status exec_s
             if (recvr == nullptr) {
                 // keep going but at least log it
                 std::stringstream err;
-                err << "cancel(): missing in stream_map: fragment=" << i->first
+                err << "cancel(): missing in stream_map: fragment=" << 
print_id(i->first)
                     << " node=" << i->second;
                 LOG(ERROR) << err.str();
             } else {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 17f39756459..b842cbeea48 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -69,7 +69,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, 
bool* eos) {
     std::unique_lock<std::mutex> l(_lock);
     // wait until something shows up or we know we're done
     while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 
0) {
-        VLOG_ROW << "wait arrival fragment_instance_id=" << 
_recvr->fragment_instance_id()
+        VLOG_ROW << "wait arrival fragment_instance_id=" << 
print_id(_recvr->fragment_instance_id())
                  << " node=" << _recvr->dest_node_id();
         // Don't count time spent waiting on the sender as active time.
         CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled);
@@ -263,8 +263,9 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int 
be_number) {
     _sender_eos_set.insert(be_number);
     DCHECK_GT(_num_remaining_senders, 0);
     _num_remaining_senders--;
-    VLOG_FILE << "decremented senders: fragment_instance_id=" << 
_recvr->fragment_instance_id()
-              << " node_id=" << _recvr->dest_node_id() << " #senders=" << 
_num_remaining_senders;
+    VLOG_FILE << "decremented senders: fragment_instance_id="
+              << print_id(_recvr->fragment_instance_id()) << " node_id=" << 
_recvr->dest_node_id()
+              << " #senders=" << _num_remaining_senders;
     if (_num_remaining_senders == 0) {
         if (_dependency) {
             _dependency->set_always_done();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index bb9e1c8907a..70ff5da18f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -63,6 +64,8 @@ public class ExecutionProfile {
     // instance id -> dummy value
     private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
 
+    private int waitCount = 0;
+
     private TUniqueId queryId;
 
     public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
@@ -127,7 +130,11 @@ public class ExecutionProfile {
 
     public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
         if (profileDoneSignal != null) {
-            profileDoneSignal.markedCountDown(fragmentInstanceId, -1L);
+            if (profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
+                LOG.info("Mark instance {} done succeed", 
DebugUtil.printId(fragmentInstanceId));
+            } else {
+                LOG.warn("Mark instance {} done failed", 
DebugUtil.printId(fragmentInstanceId));
+            }
         }
     }
 
@@ -135,6 +142,16 @@ public class ExecutionProfile {
         if (profileDoneSignal == null) {
             return true;
         }
+
+        waitCount++;
+
+        for (Entry<TUniqueId, Long> entry : profileDoneSignal.getLeftMarks()) {
+            if (waitCount > 2) {
+                LOG.info("Query {} waiting instance {}, waitCount: {}",
+                        DebugUtil.printId(queryId), 
DebugUtil.printId(entry.getKey()), waitCount);
+            }
+        }
+
         return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
index 994f5e36055..82031e32ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
@@ -105,7 +105,7 @@ public class Transaction {
 
             coordinator.exec();
             int execTimeout = ctx.getExecTimeout();
-            LOG.debug("Insert execution timeout:{}", execTimeout);
+            LOG.info("Insert {} execution timeout:{}", 
DebugUtil.printId(ctx.queryId()), execTimeout);
             boolean notTimeout = coordinator.join(execTimeout);
             if (!coordinator.isDone()) {
                 coordinator.cancel();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c84fb90f2a3..752dedf8015 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -530,13 +530,18 @@ public class Coordinator implements CoordInterface {
 
         this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Query {} idToBackend size={}", 
DebugUtil.printId(queryId), idToBackend.size());
+            int backendNum = idToBackend.size();
+            StringBuilder backendInfos = new StringBuilder("backends info:");
             for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
                 Long backendID = entry.getKey();
                 Backend backend = entry.getValue();
-                LOG.debug("Query {}, backend: {}-{}-{}-{}", 
DebugUtil.printId(queryId),
-                                    backendID, backend.getHost(), 
backend.getBePort(), backend.getProcessEpoch());
+                backendInfos.append(' ').append(backendID).append("-")
+                            .append(backend.getHost()).append("-")
+                            .append(backend.getBePort()).append("-")
+                            .append(backend.getProcessEpoch());
             }
+            LOG.debug("query {}, backend size: {}, {}",
+                    DebugUtil.printId(queryId), backendNum, 
backendInfos.toString());
         }
     }
 
@@ -631,7 +636,7 @@ public class Coordinator implements CoordInterface {
             resultInternalServiceAddr = toBrpcHost(execBeAddr);
             resultOutputExprs = fragments.get(0).getOutputExprs();
             if (LOG.isDebugEnabled()) {
-                LOG.debug("dispatch query job: {} to {}", 
DebugUtil.printId(queryId),
+                LOG.debug("dispatch result sink of query {} to {}", 
DebugUtil.printId(queryId),
                         topParams.instanceExecParams.get(0).host);
             }
 
@@ -857,7 +862,8 @@ public class Coordinator implements CoordInterface {
                     }
                 }
 
-                // 3. group BackendExecState by BE. So that we can use one RPC 
to send all fragment instances of a BE.
+                // 3. group PipelineExecContext by BE.
+                // So that we can use one RPC to send all fragment instances 
of a BE.
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
                     Long backendId = 
this.addressToBackendID.get(entry.getKey());
                     PipelineExecContext pipelineExecContext = new 
PipelineExecContext(fragment.getFragmentId(),
@@ -905,6 +911,16 @@ public class Coordinator implements CoordInterface {
                     span = 
ConnectContext.get().getTracer().spanBuilder("execRemoteFragmentsAsync")
                             
.setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
                 }
+
+                if (LOG.isDebugEnabled()) {
+                    String infos = "";
+                    for (PipelineExecContext pec : ctxs.ctxs) {
+                        infos += pec.fragmentId + " ";
+                    }
+                    LOG.debug("query {}, sending pipeline fragments: {} to be 
{} bprc address {}",
+                            DebugUtil.printId(queryId), infos, ctxs.beId, 
ctxs.brpcAddr.toString());
+                }
+
                 ctxs.scopedSpan = new ScopedSpan(span);
                 ctxs.unsetFields();
                 BackendServiceProxy proxy = BackendServiceProxy.getInstance();
@@ -2415,9 +2431,12 @@ public class Coordinator implements CoordInterface {
             if (params.isSetErrorTabletInfos()) {
                 updateErrorTabletInfos(params.getErrorTabletInfos());
             }
+
             Preconditions.checkArgument(params.isSetDetailedReport());
             for (TDetailedReportParams param : params.detailed_report) {
                 if 
(ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
+                    LOG.debug("Query {} instance {} is marked done",
+                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
                     
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
                 }
             }
@@ -2442,9 +2461,11 @@ public class Coordinator implements CoordInterface {
             // and returned_all_results_ is true.
             // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
             if (!(returnedAllResults && status.isCancelled()) && !status.ok()) 
{
-                LOG.warn("one instance report fail, query_id={} 
instance_id={}, error message: {}",
-                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                        status.getErrorMsg());
+                LOG.warn("one instance report fail, query_id={} fragment_id={} 
instance_id={}, be={},"
+                                  + " error message: {}",
+                        DebugUtil.printId(queryId), params.getFragmentId(),
+                        DebugUtil.printId(params.getFragmentInstanceId()),
+                        params.getBackendId(), status.getErrorMsg());
                 updateStatus(status, params.getFragmentInstanceId());
             }
             if 
(ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) {
@@ -2466,11 +2487,17 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
+                LOG.debug("Query {} instance {} is marked done",
+                         DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
                 
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+            } else {
+                LOG.debug("Query {} instance {} is not marked done",
+                         DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
             }
         } else {
             if (params.backend_num >= backendExecStates.size()) {
-                LOG.warn("unknown backend number: {}, expected less than: {}",
+                LOG.warn("Query {} instance {} unknown backend number: {}, 
expected less than: {}",
+                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
                         params.backend_num, backendExecStates.size());
                 return;
             }
@@ -2518,7 +2545,12 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
+                LOG.info("Query {} instance {} is marked done",
+                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
                 
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+            } else {
+                LOG.info("Query {} instance {} is not marked done",
+                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
             }
         }
 
@@ -3121,9 +3153,10 @@ public class Coordinator implements CoordInterface {
             for (TPipelineInstanceParams localParam : rpcParams.local_params) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend: {},"
-                                    + " fragment instance id={}, reason: {}",
+                                    + " fragment instance id={} query={}, 
reason: {}",
                             this.initiated, this.done, this.hasCanceled, 
backend.getId(),
-                            
DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name());
+                            DebugUtil.printId(localParam.fragment_instance_id),
+                            DebugUtil.printId(queryId), cancelReason.name());
                 }
 
                 RuntimeProfile profile = 
fragmentInstancesMap.get(localParam.fragment_instance_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index f6eaaea2544..15c93411f6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -140,9 +140,8 @@ public final class QeProcessorImpl implements QeProcessor {
     public void unregisterQuery(TUniqueId queryId) {
         QueryInfo queryInfo = coordinatorMap.remove(queryId);
         if (queryInfo != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("deregister query id {}", 
DebugUtil.printId(queryId));
-            }
+            LOG.info("Deregister query id {}", DebugUtil.printId(queryId));
+
             if (queryInfo.getConnectContext() != null
                     && 
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
             ) {
@@ -160,9 +159,7 @@ public final class QeProcessorImpl implements QeProcessor {
                 }
             }
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("not found query {} when unregisterQuery", 
DebugUtil.printId(queryId));
-            }
+            LOG.warn("not found query {} when unregisterQuery", 
DebugUtil.printId(queryId));
         }
 
         // commit hive tranaction if needed
@@ -193,6 +190,10 @@ public final class QeProcessorImpl implements QeProcessor {
 
     @Override
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams 
params, TNetworkAddress beAddr) {
+        LOG.info("Processing report exec status, query {} instance {} from {}",
+                DebugUtil.printId(params.query_id), 
DebugUtil.printId(params.fragment_instance_id),
+                beAddr.toString());
+
         if (params.isSetProfile()) {
             LOG.info("ReportExecStatus(): fragment_instance_id={}, query 
id={}, backend num: {}, ip: {}",
                     DebugUtil.printId(params.fragment_instance_id), 
DebugUtil.printId(params.query_id),
@@ -219,7 +220,8 @@ public final class QeProcessorImpl implements QeProcessor {
                 writeProfileExecutor.submit(new WriteProfileTask(params, 
info));
             }
         } catch (Exception e) {
-            LOG.warn(e.getMessage());
+            LOG.warn("Report response: {}, query: {}, instance: {}", 
result.toString(),
+                    DebugUtil.printId(params.query_id), 
DebugUtil.printId(params.fragment_instance_id));
             return result;
         }
         result.setStatus(new TStatus(TStatusCode.OK));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index b0c4d70ca74..519fb6f9bc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -204,7 +204,6 @@ public class SimpleScheduler {
                 try {
                     Thread.sleep(1000L);
                     SystemInfoService clusterInfoService = 
Env.getCurrentSystemInfo();
-                    LOG.debug("UpdateBlacklistThread retry begin");
 
                     Iterator<Map.Entry<Long, Pair<Integer, String>>> iterator 
= blacklistBackends.entrySet().iterator();
                     while (iterator.hasNext()) {
@@ -227,9 +226,6 @@ public class SimpleScheduler {
                             }
                         }
                     }
-
-                    LOG.debug("UpdateBlacklistThread retry end");
-
                 } catch (Throwable ex) {
                     LOG.warn("blacklist thread exception", ex);
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ee974296912..92fd39f89fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1353,6 +1353,8 @@ public class StmtExecutor {
 
     // Process a select statement.
     private void handleQueryStmt() throws Exception {
+        LOG.info("Handling query {} with query id {}",
+                          originStmt.originStmt, 
DebugUtil.printId(context.queryId));
         // Every time set no send flag and clean all data in buffer
         context.getMysqlChannel().reset();
         Queriable queryStmt = (Queriable) parsedStmt;
@@ -1369,6 +1371,7 @@ public class StmtExecutor {
         if (queryStmt.isExplain()) {
             String explainString = 
planner.getExplainString(queryStmt.getExplainOptions());
             handleExplainStmt(explainString, false);
+            LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
             return;
         }
 
@@ -1376,6 +1379,7 @@ public class StmtExecutor {
         Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
         if (resultSet.isPresent()) {
             sendResultSet(resultSet.get());
+            LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
             return;
         }
 
@@ -1389,6 +1393,7 @@ public class StmtExecutor {
                 && context.getSessionVariable().getDefaultOrderByLimit() < 0) {
             if (queryStmt instanceof QueryStmt || queryStmt instanceof 
LogicalPlanAdapter) {
                 handleCacheStmt(cacheAnalyzer, channel);
+                LOG.info("Query {} finished", 
DebugUtil.printId(context.queryId));
                 return;
             }
         }
@@ -1400,11 +1405,13 @@ public class StmtExecutor {
                 LOG.info("ignore handle limit 0 ,sql:{}", 
parsedSelectStmt.toSql());
                 sendFields(queryStmt.getColLabels(), 
exprToType(queryStmt.getResultExprs()));
                 context.getState().setEof();
+                LOG.info("Query {} finished", 
DebugUtil.printId(context.queryId));
                 return;
             }
         }
 
         sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
+        LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
     }
 
     private void sendResult(boolean isOutfileQuery, boolean isSendFields, 
Queriable queryStmt, MysqlChannel channel,
@@ -1893,10 +1900,10 @@ public class StmtExecutor {
 
                 coord.exec();
                 int execTimeout = context.getExecTimeout();
-                LOG.debug("Insert execution timeout:{}", execTimeout);
+                LOG.debug("Insert {} execution timeout:{}", 
DebugUtil.printId(context.queryId()), execTimeout);
                 boolean notTimeout = coord.join(execTimeout);
                 if (!coord.isDone()) {
-                    coord.cancel();
+                    coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
                     if (notTimeout) {
                         errMsg = coord.getExecStatus().getErrorMsg();
                         ErrorReport.reportDdlException("There exists unhealthy 
backend. "


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to