This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d6c64d305f0 [chore](log) Add log to trace query execution #25739
d6c64d305f0 is described below
commit d6c64d305f0e906b2fd044db8469b6551716d509
Author: zhiqiang <[email protected]>
AuthorDate: Thu Oct 26 01:09:25 2023 -0500
[chore](log) Add log to trace query execution #25739
---
be/src/agent/task_worker_pool.cpp | 3 --
be/src/exec/exec_node.cpp | 7 +--
be/src/pipeline/pipeline_fragment_context.cpp | 24 +++++-----
be/src/pipeline/pipeline_fragment_context.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 15 +++++-
be/src/runtime/fragment_mgr.cpp | 25 ++++++----
be/src/runtime/plan_fragment_executor.cpp | 18 +++----
be/src/runtime/plan_fragment_executor.h | 2 +
be/src/runtime/query_context.h | 8 +++-
be/src/runtime/runtime_state.h | 2 +-
be/src/util/debug_util.cpp | 4 +-
be/src/util/debug_util.h | 2 +-
be/src/vec/runtime/vdata_stream_mgr.cpp | 13 ++---
be/src/vec/runtime/vdata_stream_recvr.cpp | 7 +--
.../doris/common/profile/ExecutionProfile.java | 19 +++++++-
.../org/apache/doris/nereids/txn/Transaction.java | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 55 +++++++++++++++++-----
.../java/org/apache/doris/qe/QeProcessorImpl.java | 16 ++++---
.../java/org/apache/doris/qe/SimpleScheduler.java | 4 --
.../java/org/apache/doris/qe/StmtExecutor.java | 11 ++++-
20 files changed, 157 insertions(+), 82 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index ff22f7923c3..6717af3ce4b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1071,9 +1071,6 @@ void TaskWorkerPool::_handle_report(const TReportRequest&
request, ReportType ty
.error(result.status);
} else {
is_report_success = true;
- LOG_INFO("successfully report {}", TYPE_STRING(type))
- .tag("host", _master_info.network_address.hostname)
- .tag("port", _master_info.network_address.port);
}
switch (type) {
case TASK:
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ed8b11457a6..16964264edf 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -208,12 +208,13 @@ void ExecNode::release_resource(doris::RuntimeState*
state) {
Status ExecNode::close(RuntimeState* state) {
if (_is_closed) {
- LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ LOG(INFO) << "query= " << print_id(state->query_id())
+ << " fragment_instance_id=" <<
print_id(state->fragment_instance_id())
<< " already closed";
return Status::OK();
}
- LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << ", "
- << " id=" << _id << " type=" << print_plan_node_type(_type) << "
closed";
+ LOG(INFO) << "query= " << print_id(state->query_id())
+ << " fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;
Status result;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 709e4df7652..686ff0fe0ad 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -166,8 +166,7 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
LOG(WARNING) << "PipelineFragmentContext "
- << PrintInstanceStandardInfo(_query_id, _fragment_id,
- _fragment_instance_id)
+ << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
<< " is canceled, cancel message: " << msg;
} else {
@@ -217,12 +216,9 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
tracer = telemetry::get_tracer(print_id(_query_id));
}
- LOG_INFO("PipelineFragmentContext::prepare")
- .tag("query_id", print_id(_query_id))
- .tag("fragment_id", _fragment_id)
- .tag("instance_id", print_id(local_params.fragment_instance_id))
- .tag("backend_num", local_params.backend_num)
- .tag("pthread_id", (uintptr_t)pthread_self());
+ LOG_INFO("Preparing instance {}, backend_num {}",
+ PrintInstanceStandardInfo(_query_id,
local_params.fragment_instance_id),
+ local_params.backend_num);
// 1. init _runtime_state
_runtime_state = RuntimeState::create_unique(local_params,
request.query_id,
@@ -284,8 +280,9 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
std::vector<ExecNode*> scan_nodes;
std::vector<TScanRangeParams> no_scan_ranges;
_root_plan->collect_scan_nodes(&scan_nodes);
- VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size();
- VLOG_CRITICAL << "params.per_node_scan_ranges.size()="
+ VLOG_CRITICAL << "query " << print_id(get_query_id())
+ << " scan_nodes.size()=" << scan_nodes.size();
+ VLOG_CRITICAL << "query " << print_id(get_query_id()) << "
params.per_node_scan_ranges.size()="
<< local_params.per_node_scan_ranges.size();
// set scan range in ScanNode
@@ -310,7 +307,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
- VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
+ VLOG_CRITICAL << "query " << print_id(get_query_id())
+ << "scan_node_Id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}
}
@@ -750,7 +748,9 @@ void PipelineFragmentContext::close_if_prepare_failed() {
}
for (auto& task : _tasks) {
DCHECK(!task->is_pending_finish());
- WARN_IF_ERROR(task->close(Status::OK()), "close_if_prepare_failed
failed: ");
+ std::stringstream msg;
+ msg << "query " << print_id(_query_id) << " closed since prepare
failed";
+ WARN_IF_ERROR(task->close(Status::OK()), msg.str());
close_a_pipeline();
}
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index ffbfaef987a..85834e513f9 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,7 +71,7 @@ public:
PipelinePtr add_pipeline();
- TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
+ TUniqueId get_fragment_instance_id() const { return _fragment_instance_id;
}
virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/)
{
return _runtime_state.get();
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index ee6f5cdd829..4ef5def6f1d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -31,6 +31,7 @@
#include <string>
#include <thread>
+#include "common/logging.h"
#include "common/signal_handler.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/task_queue.h"
@@ -111,9 +112,9 @@ void BlockedTaskScheduler::_schedule() {
if (state == PipelineTaskState::PENDING_FINISH) {
// should cancel or should finish
if (task->is_pending_finish()) {
- VLOG_DEBUG << "Task pending" << task->debug_string();
iter++;
} else {
+ VLOG_DEBUG << "Task pending" << task->debug_string();
_make_task_run(local_blocked_tasks, iter,
PipelineTaskState::PENDING_FINISH);
}
} else if (task->query_context()->is_cancelled()) {
@@ -272,7 +273,6 @@ void TaskScheduler::_do_work(size_t index) {
LOG(WARNING) << fmt::format(
"Pipeline task failed. query_id: {} reason: {}",
PrintInstanceStandardInfo(task->query_context()->query_id(),
-
task->fragment_context()->get_fragment_id(),
task->fragment_context()->get_fragment_instance_id()),
status.msg());
// Print detail informations below when you debugging here.
@@ -296,11 +296,22 @@ void TaskScheduler::_do_work(size_t index) {
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"finalize fail:" + status.msg());
} else {
+ VLOG_DEBUG << fmt::format(
+ "Try close task: {}, fragment_ctx->is_canceled(): {}",
+ PrintInstanceStandardInfo(
+ task->query_context()->query_id(),
+
task->fragment_context()->get_fragment_instance_id()),
+ fragment_ctx->is_canceled());
_try_close_task(task,
fragment_ctx->is_canceled() ?
PipelineTaskState::CANCELED
:
PipelineTaskState::FINISHED,
status);
}
+ VLOG_DEBUG << fmt::format(
+ "Task {} is eos, status {}.",
+
PrintInstanceStandardInfo(task->query_context()->query_id(),
+
task->fragment_context()->get_fragment_instance_id()),
+ get_state_name(task->get_state()));
continue;
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1fb68049e6e..83b57c28fd7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -415,6 +415,8 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
if (!rpc_status.ok()) {
+ LOG_INFO("Going to cancel instance {} since report exec status got rpc
failed: {}",
+ print_id(req.fragment_instance_id), rpc_status.to_string());
// we need to cancel the execution of this fragment
static_cast<void>(req.update_fn(rpc_status));
req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR,
rpc_status.msg());
@@ -425,15 +427,13 @@ static void empty_function(RuntimeState*, Status*) {}
void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor>
fragment_executor,
const FinishCallback& cb) {
- std::string func_name {"PlanFragmentExecutor::_exec_actual"};
#ifndef BE_TEST
SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
#endif
- LOG_INFO(func_name)
- .tag("query_id", fragment_executor->query_id())
- .tag("instance_id", fragment_executor->fragment_instance_id())
- .tag("pthread_id", (uintptr_t)pthread_self());
+ LOG_INFO("Instance {} executing",
+ PrintInstanceStandardInfo(fragment_executor->query_id(),
+
fragment_executor->fragment_instance_id()));
Status st = fragment_executor->execute();
if (!st.ok()) {
@@ -452,8 +452,13 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
+
+ LOG_INFO("Instance {} finished",
+ PrintInstanceStandardInfo(fragment_executor->query_id(),
+
fragment_executor->fragment_instance_id()));
if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id());
+ LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
}
@@ -560,11 +565,12 @@ void FragmentMgr::remove_pipeline_context(
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
for (const auto& ins_id : ins_ids) {
- VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ",
all_done:" << all_done;
+ LOG_INFO("Removing query {} instance {}, all done? {}",
print_id(query_id),
+ print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
}
if (all_done) {
- LOG(INFO) << "remove query context " << print_id(query_id);
+ LOG_INFO("Query {} finished", print_id(query_id));
_query_ctx_map.erase(query_id);
}
}
@@ -800,11 +806,11 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
cur_span->SetAttribute("query_id", print_id(params.query_id));
- VLOG_ROW << "exec_plan_fragment params is "
+ VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment
params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for
debuggin purpose
- VLOG_ROW << "query options is "
+ VLOG_ROW << "query: " << print_id(params.query_id) << "query options is "
<<
apache::thrift::ThriftDebugString(params.query_options).c_str();
std::shared_ptr<QueryContext> query_ctx;
@@ -1075,6 +1081,7 @@ void FragmentMgr::cancel_worker() {
}
for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
+ LOG_INFO("Query {} is timeout", print_id(it->first));
it = _query_ctx_map.erase(it);
} else {
++it;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index e96b086ad44..78d48327abf 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -56,6 +56,7 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/thread_context.h"
#include "util/container_util.hpp"
+#include "util/debug_util.h"
#include "util/defer_op.h"
#include "util/pretty_printer.h"
#include "util/telemetry/telemetry.h"
@@ -264,10 +265,9 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
Status PlanFragmentExecutor::open() {
int64_t mem_limit = _runtime_state->query_mem_tracker()->limit();
- LOG_INFO("PlanFragmentExecutor::open")
- .tag("query_id", _query_ctx->query_id())
- .tag("instance_id", _runtime_state->fragment_instance_id())
- .tag("mem_limit", PrettyPrinter::print(mem_limit, TUnit::BYTES));
+ LOG_INFO("PlanFragmentExecutor::open {}, mem_limit {}",
+ PrintInstanceStandardInfo(_query_ctx->query_id(),
_fragment_instance_id),
+ PrettyPrinter::print(mem_limit, TUnit::BYTES));
// we need to start the profile-reporting thread before calling Open(),
since it
// may block
@@ -592,11 +592,9 @@ void PlanFragmentExecutor::stop_report_thread() {
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
std::lock_guard<std::mutex> l(_status_lock);
- LOG_INFO("PlanFragmentExecutor::cancel")
- .tag("query_id", print_id(_query_ctx->query_id()))
- .tag("instance_id",
print_id(_runtime_state->fragment_instance_id()))
- .tag("reason", reason)
- .tag("error message", msg);
+ LOG_INFO("PlanFragmentExecutor::cancel {} reason {} error msg {}",
+ PrintInstanceStandardInfo(query_id(), fragment_instance_id()),
reason, msg);
+
// NOTE: Not need to check if already cancelled.
// Bug scenario: test_array_map_function.groovy:
// select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/
array_map((x,y)->x+y, c_array1, c_array2) from test.array_test2 where id > 10
order by id
@@ -680,8 +678,6 @@ void PlanFragmentExecutor::close() {
}
LOG(INFO) << ss.str();
}
- LOG(INFO) << "Close() fragment_instance_id="
- << print_id(_runtime_state->fragment_instance_id());
}
profile()->add_to_span(_span);
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index bb39596ff10..5426626da32 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -146,6 +146,8 @@ public:
TUniqueId query_id() const { return _query_ctx->query_id(); }
+ int fragment_id() const { return _fragment_id; }
+
bool is_timeout(const VecDateTimeValue& now) const;
bool is_canceled() { return _runtime_state->is_cancelled(); }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 4ec3517740d..c51e92d14c8 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -22,6 +22,7 @@
#include <atomic>
#include <memory>
+#include <sstream>
#include <string>
#include "common/config.h"
@@ -81,9 +82,10 @@ public:
// it is found that query already exists in _query_ctx_map, and query
mem tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because
there is memory consumed
// on query mem tracker, released on other trackers.
+ std::string mem_tracker_msg {""};
if (query_mem_tracker->peak_consumption() != 0) {
- LOG(INFO) << fmt::format(
- "Deregister query/load memory tracker, queryId={},
Limit={}, CurrUsed={}, "
+ mem_tracker_msg = fmt::format(
+ ", deregister query/load memory tracker, queryId={},
Limit={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id),
MemTracker::print_bytes(query_mem_tracker->limit()),
MemTracker::print_bytes(query_mem_tracker->consumption()),
@@ -92,6 +94,8 @@ public:
if (_task_group) {
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
}
+
+ LOG_INFO("Query {} deconstructed, {}", print_id(_query_id),
mem_tracker_msg);
}
// Notice. For load fragments, the fragment_num sent by FE has a small
probability of 0.
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 65256b98dd4..29ef581947c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -181,7 +181,7 @@ public:
// Create a error status, so that we could print error stack, and
// we could know which path call cancel.
LOG(WARNING) << "Task is cancelled, instance: "
- << PrintInstanceStandardInfo(_query_id, _fragment_id,
_fragment_instance_id)
+ << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
<< " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
}
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 2d44c281a5d..68ea21f02dd 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -128,9 +128,9 @@ std::string PrintFrontendInfo(const TFrontendInfo& fe_info)
{
return ss.str();
}
-std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid,
const TUniqueId& iid) {
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId&
iid) {
std::stringstream ss;
- ss << print_id(iid) << '|' << fid << '|' << print_id(qid);
+ ss << print_id(iid) << '|' << print_id(qid);
return ss.str();
}
diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h
index e6b6491b8a9..31cc1f8f5ca 100644
--- a/be/src/util/debug_util.h
+++ b/be/src/util/debug_util.h
@@ -38,7 +38,7 @@ std::string PrintFrontendInfos(const
std::vector<TFrontendInfo>& fe_infos);
// A desirable scenario would be to call this function WHENEVER whenever we
need to print instance information.
// By using a fixed format, we would be able to identify all the paths in
which this instance is executed.
// InstanceId|FragmentIdx|QueryId
-std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid,
const TUniqueId& iid);
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId&
iid);
// Returns a string "<product version number> (build <build hash>)"
// If compact == false, this string is appended: "\nBuilt on <build time>"
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 5c87059c3dd..19db8ca15b1 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -60,7 +60,7 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::create_recvr(
PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
{
DCHECK(profile != nullptr);
- VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
+ VLOG_FILE << "creating receiver for fragment=" <<
print_id(fragment_instance_id)
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
this, state, row_desc, fragment_instance_id, dest_node_id,
num_senders, is_merging,
@@ -75,7 +75,8 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::create_recvr(
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId&
fragment_instance_id,
PlanNodeId
node_id,
bool
acquire_lock) {
- VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id <<
", node=" << node_id;
+ VLOG_ROW << "looking up fragment_instance_id=" <<
print_id(fragment_instance_id)
+ << ", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
// Create lock guard and not own lock currently and will lock conditionally
std::unique_lock recvr_lock(_lock, std::defer_lock);
@@ -141,7 +142,7 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id,
PlanNodeId node_id) {
std::shared_ptr<VDataStreamRecvr> targert_recvr;
- VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" <<
fragment_instance_id
+ VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" <<
print_id(fragment_instance_id)
<< ", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
{
@@ -168,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId&
fragment_instance_id, P
return Status::OK();
} else {
std::stringstream err;
- err << "unknown row receiver id: fragment_instance_id=" <<
fragment_instance_id
+ err << "unknown row receiver id: fragment_instance_id=" <<
print_id(fragment_instance_id)
<< " node_id=" << node_id;
LOG(ERROR) << err.str();
return Status::InternalError(err.str());
@@ -176,7 +177,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId&
fragment_instance_id, P
}
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status
exec_status) {
- VLOG_QUERY << "cancelling all streams for fragment=" <<
fragment_instance_id;
+ VLOG_QUERY << "cancelling all streams for fragment=" <<
print_id(fragment_instance_id);
std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
{
std::lock_guard<std::mutex> l(_lock);
@@ -187,7 +188,7 @@ void VDataStreamMgr::cancel(const TUniqueId&
fragment_instance_id, Status exec_s
if (recvr == nullptr) {
// keep going but at least log it
std::stringstream err;
- err << "cancel(): missing in stream_map: fragment=" << i->first
+ err << "cancel(): missing in stream_map: fragment=" <<
print_id(i->first)
<< " node=" << i->second;
LOG(ERROR) << err.str();
} else {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 17f39756459..b842cbeea48 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -69,7 +69,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block,
bool* eos) {
std::unique_lock<std::mutex> l(_lock);
// wait until something shows up or we know we're done
while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders >
0) {
- VLOG_ROW << "wait arrival fragment_instance_id=" <<
_recvr->fragment_instance_id()
+ VLOG_ROW << "wait arrival fragment_instance_id=" <<
print_id(_recvr->fragment_instance_id())
<< " node=" << _recvr->dest_node_id();
// Don't count time spent waiting on the sender as active time.
CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled);
@@ -263,8 +263,9 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int
be_number) {
_sender_eos_set.insert(be_number);
DCHECK_GT(_num_remaining_senders, 0);
_num_remaining_senders--;
- VLOG_FILE << "decremented senders: fragment_instance_id=" <<
_recvr->fragment_instance_id()
- << " node_id=" << _recvr->dest_node_id() << " #senders=" <<
_num_remaining_senders;
+ VLOG_FILE << "decremented senders: fragment_instance_id="
+ << print_id(_recvr->fragment_instance_id()) << " node_id=" <<
_recvr->dest_node_id()
+ << " #senders=" << _num_remaining_senders;
if (_num_remaining_senders == 0) {
if (_dependency) {
_dependency->set_always_done();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index bb9e1c8907a..70ff5da18f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,8 @@ public class ExecutionProfile {
// instance id -> dummy value
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
+ private int waitCount = 0;
+
private TUniqueId queryId;
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
@@ -127,7 +130,11 @@ public class ExecutionProfile {
public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
if (profileDoneSignal != null) {
- profileDoneSignal.markedCountDown(fragmentInstanceId, -1L);
+ if (profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
+ LOG.info("Mark instance {} done succeed",
DebugUtil.printId(fragmentInstanceId));
+ } else {
+ LOG.warn("Mark instance {} done failed",
DebugUtil.printId(fragmentInstanceId));
+ }
}
}
@@ -135,6 +142,16 @@ public class ExecutionProfile {
if (profileDoneSignal == null) {
return true;
}
+
+ waitCount++;
+
+ for (Entry<TUniqueId, Long> entry : profileDoneSignal.getLeftMarks()) {
+ if (waitCount > 2) {
+ LOG.info("Query {} waiting instance {}, waitCount: {}",
+ DebugUtil.printId(queryId),
DebugUtil.printId(entry.getKey()), waitCount);
+ }
+ }
+
return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
index 994f5e36055..82031e32ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
@@ -105,7 +105,7 @@ public class Transaction {
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
- LOG.debug("Insert execution timeout:{}", execTimeout);
+ LOG.info("Insert {} execution timeout:{}",
DebugUtil.printId(ctx.queryId()), execTimeout);
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c84fb90f2a3..752dedf8015 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -530,13 +530,18 @@ public class Coordinator implements CoordInterface {
this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
if (LOG.isDebugEnabled()) {
- LOG.debug("Query {} idToBackend size={}",
DebugUtil.printId(queryId), idToBackend.size());
+ int backendNum = idToBackend.size();
+ StringBuilder backendInfos = new StringBuilder("backends info:");
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
Long backendID = entry.getKey();
Backend backend = entry.getValue();
- LOG.debug("Query {}, backend: {}-{}-{}-{}",
DebugUtil.printId(queryId),
- backendID, backend.getHost(),
backend.getBePort(), backend.getProcessEpoch());
+ backendInfos.append(' ').append(backendID).append("-")
+ .append(backend.getHost()).append("-")
+ .append(backend.getBePort()).append("-")
+ .append(backend.getProcessEpoch());
}
+ LOG.debug("query {}, backend size: {}, {}",
+ DebugUtil.printId(queryId), backendNum,
backendInfos.toString());
}
}
@@ -631,7 +636,7 @@ public class Coordinator implements CoordInterface {
resultInternalServiceAddr = toBrpcHost(execBeAddr);
resultOutputExprs = fragments.get(0).getOutputExprs();
if (LOG.isDebugEnabled()) {
- LOG.debug("dispatch query job: {} to {}",
DebugUtil.printId(queryId),
+ LOG.debug("dispatch result sink of query {} to {}",
DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
}
@@ -857,7 +862,8 @@ public class Coordinator implements CoordInterface {
}
}
- // 3. group BackendExecState by BE. So that we can use one RPC
to send all fragment instances of a BE.
+ // 3. group PipelineExecContext by BE.
+ // So that we can use one RPC to send all fragment instances
of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
Long backendId =
this.addressToBackendID.get(entry.getKey());
PipelineExecContext pipelineExecContext = new
PipelineExecContext(fragment.getFragmentId(),
@@ -905,6 +911,16 @@ public class Coordinator implements CoordInterface {
span =
ConnectContext.get().getTracer().spanBuilder("execRemoteFragmentsAsync")
.setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
}
+
+ if (LOG.isDebugEnabled()) {
+ String infos = "";
+ for (PipelineExecContext pec : ctxs.ctxs) {
+ infos += pec.fragmentId + " ";
+ }
+ LOG.debug("query {}, sending pipeline fragments: {} to be
{} bprc address {}",
+ DebugUtil.printId(queryId), infos, ctxs.beId,
ctxs.brpcAddr.toString());
+ }
+
ctxs.scopedSpan = new ScopedSpan(span);
ctxs.unsetFields();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
@@ -2415,9 +2431,12 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+
Preconditions.checkArgument(params.isSetDetailedReport());
for (TDetailedReportParams param : params.detailed_report) {
if
(ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
+ LOG.debug("Query {} instance {} is marked done",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
}
}
@@ -2442,9 +2461,11 @@ public class Coordinator implements CoordInterface {
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
if (!(returnedAllResults && status.isCancelled()) && !status.ok())
{
- LOG.warn("one instance report fail, query_id={}
instance_id={}, error message: {}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.getErrorMsg());
+ LOG.warn("one instance report fail, query_id={} fragment_id={}
instance_id={}, be={},"
+ + " error message: {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(), status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
}
if
(ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) {
@@ -2466,11 +2487,17 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+ LOG.debug("Query {} instance {} is marked done",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+ } else {
+ LOG.debug("Query {} instance {} is not marked done",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
}
} else {
if (params.backend_num >= backendExecStates.size()) {
- LOG.warn("unknown backend number: {}, expected less than: {}",
+ LOG.warn("Query {} instance {} unknown backend number: {},
expected less than: {}",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
params.backend_num, backendExecStates.size());
return;
}
@@ -2518,7 +2545,12 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+ LOG.info("Query {} instance {} is marked done",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+ } else {
+ LOG.info("Query {} instance {} is not marked done",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
}
}
@@ -3121,9 +3153,10 @@ public class Coordinator implements CoordInterface {
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend: {},"
- + " fragment instance id={}, reason: {}",
+ + " fragment instance id={} query={},
reason: {}",
this.initiated, this.done, this.hasCanceled,
backend.getId(),
-
DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name());
+ DebugUtil.printId(localParam.fragment_instance_id),
+ DebugUtil.printId(queryId), cancelReason.name());
}
RuntimeProfile profile =
fragmentInstancesMap.get(localParam.fragment_instance_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index f6eaaea2544..15c93411f6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -140,9 +140,8 @@ public final class QeProcessorImpl implements QeProcessor {
public void unregisterQuery(TUniqueId queryId) {
QueryInfo queryInfo = coordinatorMap.remove(queryId);
if (queryInfo != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("deregister query id {}",
DebugUtil.printId(queryId));
- }
+ LOG.info("Deregister query id {}", DebugUtil.printId(queryId));
+
if (queryInfo.getConnectContext() != null
&&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
@@ -160,9 +159,7 @@ public final class QeProcessorImpl implements QeProcessor {
}
}
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("not found query {} when unregisterQuery",
DebugUtil.printId(queryId));
- }
+ LOG.warn("not found query {} when unregisterQuery",
DebugUtil.printId(queryId));
}
// commit hive tranaction if needed
@@ -193,6 +190,10 @@ public final class QeProcessorImpl implements QeProcessor {
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams
params, TNetworkAddress beAddr) {
+ LOG.info("Processing report exec status, query {} instance {} from {}",
+ DebugUtil.printId(params.query_id),
DebugUtil.printId(params.fragment_instance_id),
+ beAddr.toString());
+
if (params.isSetProfile()) {
LOG.info("ReportExecStatus(): fragment_instance_id={}, query
id={}, backend num: {}, ip: {}",
DebugUtil.printId(params.fragment_instance_id),
DebugUtil.printId(params.query_id),
@@ -219,7 +220,8 @@ public final class QeProcessorImpl implements QeProcessor {
writeProfileExecutor.submit(new WriteProfileTask(params,
info));
}
} catch (Exception e) {
- LOG.warn(e.getMessage());
+ LOG.warn("Report response: {}, query: {}, instance: {}",
result.toString(),
+ DebugUtil.printId(params.query_id),
DebugUtil.printId(params.fragment_instance_id));
return result;
}
result.setStatus(new TStatus(TStatusCode.OK));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index b0c4d70ca74..519fb6f9bc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -204,7 +204,6 @@ public class SimpleScheduler {
try {
Thread.sleep(1000L);
SystemInfoService clusterInfoService =
Env.getCurrentSystemInfo();
- LOG.debug("UpdateBlacklistThread retry begin");
Iterator<Map.Entry<Long, Pair<Integer, String>>> iterator
= blacklistBackends.entrySet().iterator();
while (iterator.hasNext()) {
@@ -227,9 +226,6 @@ public class SimpleScheduler {
}
}
}
-
- LOG.debug("UpdateBlacklistThread retry end");
-
} catch (Throwable ex) {
LOG.warn("blacklist thread exception", ex);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ee974296912..92fd39f89fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1353,6 +1353,8 @@ public class StmtExecutor {
// Process a select statement.
private void handleQueryStmt() throws Exception {
+ LOG.info("Handling query {} with query id {}",
+ originStmt.originStmt,
DebugUtil.printId(context.queryId));
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
Queriable queryStmt = (Queriable) parsedStmt;
@@ -1369,6 +1371,7 @@ public class StmtExecutor {
if (queryStmt.isExplain()) {
String explainString =
planner.getExplainString(queryStmt.getExplainOptions());
handleExplainStmt(explainString, false);
+ LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
return;
}
@@ -1376,6 +1379,7 @@ public class StmtExecutor {
Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
if (resultSet.isPresent()) {
sendResultSet(resultSet.get());
+ LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
return;
}
@@ -1389,6 +1393,7 @@ public class StmtExecutor {
&& context.getSessionVariable().getDefaultOrderByLimit() < 0) {
if (queryStmt instanceof QueryStmt || queryStmt instanceof
LogicalPlanAdapter) {
handleCacheStmt(cacheAnalyzer, channel);
+ LOG.info("Query {} finished",
DebugUtil.printId(context.queryId));
return;
}
}
@@ -1400,11 +1405,13 @@ public class StmtExecutor {
LOG.info("ignore handle limit 0 ,sql:{}",
parsedSelectStmt.toSql());
sendFields(queryStmt.getColLabels(),
exprToType(queryStmt.getResultExprs()));
context.getState().setEof();
+ LOG.info("Query {} finished",
DebugUtil.printId(context.queryId));
return;
}
}
sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
+ LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
}
private void sendResult(boolean isOutfileQuery, boolean isSendFields,
Queriable queryStmt, MysqlChannel channel,
@@ -1893,10 +1900,10 @@ public class StmtExecutor {
coord.exec();
int execTimeout = context.getExecTimeout();
- LOG.debug("Insert execution timeout:{}", execTimeout);
+ LOG.debug("Insert {} execution timeout:{}",
DebugUtil.printId(context.queryId()), execTimeout);
boolean notTimeout = coord.join(execTimeout);
if (!coord.isDone()) {
- coord.cancel();
+ coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
if (notTimeout) {
errMsg = coord.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("There exists unhealthy
backend. "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]