This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5104982614 [enhancement](tracing) append the profile counter to trace.
(#11458)
5104982614 is described below
commit 5104982614009fb954070c3c07802391ceccc918
Author: luozenglin <[email protected]>
AuthorDate: Mon Aug 15 21:36:38 2022 +0800
[enhancement](tracing) append the profile counter to trace. (#11458)
1. append the profile counter and infos to span attributes.
2. output traceid to audit log.
---
be/src/exec/data_sink.h | 1 +
be/src/exec/exec_node.cpp | 2 +
be/src/runtime/plan_fragment_executor.cpp | 1 +
be/src/util/runtime_profile.cpp | 85 +++++++++++++++++++++-
be/src/util/runtime_profile.h | 29 ++++++--
be/src/util/telemetry/telemetry.cpp | 6 +-
be/src/util/telemetry/telemetry.h | 13 ----
be/src/vec/exec/volap_scan_node.cpp | 19 +----
.../java/org/apache/doris/plugin/AuditEvent.java | 8 ++
.../java/org/apache/doris/qe/ConnectProcessor.java | 9 ++-
.../java/org/apache/doris/qe/StmtExecutor.java | 68 ++++++++++-------
11 files changed, 171 insertions(+), 70 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 952e51e5cd..423e60a74a 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -70,6 +70,7 @@ public:
// It must be okay to call this multiple times. Subsequent calls should
// be ignored.
virtual Status close(RuntimeState* state, Status exec_status) {
+ profile()->add_to_span();
_closed = true;
return Status::OK();
}
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index be48218fb6..f581a11c40 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -284,6 +284,8 @@ Status ExecNode::close(RuntimeState* state) {
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
}
+ runtime_profile()->add_to_span();
+
return result;
}
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 381649c730..6599785a41 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -688,6 +688,7 @@ void PlanFragmentExecutor::close() {
<< print_id(_runtime_state->fragment_instance_id());
}
+ profile()->add_to_span();
_closed = true;
}
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 4babd77269..51e12397f1 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -29,7 +29,6 @@
#include "util/container_util.hpp"
#include "util/cpu_info.h"
#include "util/debug_util.h"
-#include "util/pretty_printer.h"
#include "util/thrift_util.h"
#include "util/url_coding.h"
@@ -398,7 +397,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const
std::string& name, TU
DCHECK(parent_counter_name == ROOT_COUNTER ||
_counter_map.find(parent_counter_name) != _counter_map.end());
- Counter* counter = _pool->add(new Counter(type, 0, name));
+ Counter* counter = _pool->add(new Counter(type, 0));
_counter_map[name] = counter;
std::set<std::string>* child_counters =
find_or_insert(&_child_counter_map, parent_counter_name,
std::set<std::string>());
@@ -528,6 +527,88 @@ void RuntimeProfile::pretty_print(std::ostream* s, const
std::string& prefix) co
}
}
+void RuntimeProfile::add_to_span() {
+ auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
+ if (!span->IsRecording() || _added_to_span) {
+ return;
+ }
+ _added_to_span = true;
+
+ CounterMap counter_map;
+ ChildCounterMap child_counter_map;
+ {
+ std::lock_guard<std::mutex> l(_counter_map_lock);
+ counter_map = _counter_map;
+ child_counter_map = _child_counter_map;
+ }
+
+ auto total_time = counter_map.find("TotalTime");
+ DCHECK(total_time != counter_map.end());
+
+ // profile name like "VDataBufferSender
(dst_fragment_instance_id=-2608c96868f3b77d--713968f450bfbe0d):"
+ // to "VDataBufferSender"
+ auto i = _name.find_first_of("(: ");
+ auto short_name = _name.substr(0, i);
+ span->SetAttribute("TotalTime", print_json_counter(short_name,
total_time->second));
+
+ {
+ std::lock_guard<std::mutex> l(_info_strings_lock);
+ for (const std::string& key : _info_strings_display_order) {
+ // nlohmann json will core dump when serializing 'KeyRanges', here
temporarily skip it.
+ if (key.compare("KeyRanges") == 0) {
+ continue;
+ }
+ span->SetAttribute(key, print_json_info(short_name,
_info_strings.find(key)->second));
+ }
+ }
+
+ RuntimeProfile::add_child_counters_to_span(span, short_name, ROOT_COUNTER,
counter_map,
+ child_counter_map);
+
+ ChildVector children;
+ {
+ std::lock_guard<std::mutex> l(_children_lock);
+ children = _children;
+ }
+
+ for (int i = 0; i < children.size(); ++i) {
+ RuntimeProfile* profile = children[i].first;
+ profile->add_to_span();
+ }
+}
+
+void RuntimeProfile::add_child_counters_to_span(OpentelemetrySpan span,
+ const std::string&
profile_name,
+ const std::string&
counter_name,
+ const CounterMap& counter_map,
+ const ChildCounterMap&
child_counter_map) {
+ ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
+
+ if (itr != child_counter_map.end()) {
+ const std::set<std::string>& child_counters = itr->second;
+ for (const std::string& child_counter : child_counters) {
+ CounterMap::const_iterator iter = counter_map.find(child_counter);
+ DCHECK(iter != counter_map.end());
+ span->SetAttribute(iter->first, print_json_counter(profile_name,
iter->second));
+ RuntimeProfile::add_child_counters_to_span(span, profile_name,
child_counter,
+ counter_map,
child_counter_map);
+ }
+ }
+}
+
+std::string RuntimeProfile::print_json_info(const std::string& profile_name,
std::string value) {
+ rapidjson::StringBuffer s;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(s);
+
+ writer.StartObject();
+ writer.Key("profile");
+ writer.String(profile_name.c_str());
+ writer.Key("pretty");
+ writer.String(value.c_str());
+ writer.EndObject();
+ return s.GetString();
+}
+
void RuntimeProfile::to_thrift(TRuntimeProfileTree* tree) {
tree->nodes.clear();
to_thrift(&tree->nodes);
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 6a84def821..ad3424b326 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -29,10 +29,13 @@
#include <mutex>
#include <thread>
-#include "common/logging.h"
#include "gen_cpp/RuntimeProfile_types.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
#include "util/binary_cast.hpp"
+#include "util/pretty_printer.h"
#include "util/stopwatch.hpp"
+#include "util/telemetry/telemetry.h"
namespace doris {
@@ -71,8 +74,7 @@ class RuntimeProfile {
public:
class Counter {
public:
- Counter(TUnit::type type, int64_t value = 0, std::string name = "")
- : _value(value), _type(type), _name(std::move(name)) {}
+ Counter(TUnit::type type, int64_t value = 0) : _value(value),
_type(type) {}
virtual ~Counter() = default;
virtual void update(int64_t delta) { _value.fetch_add(delta,
std::memory_order_relaxed); }
@@ -94,16 +96,11 @@ public:
TUnit::type type() const { return _type; }
- std::string name() const { return _name; }
-
- void set_name(std::string name) { _name = std::move(name); }
-
private:
friend class RuntimeProfile;
std::atomic<int64_t> _value;
TUnit::type _type;
- std::string _name;
};
class DerivedCounter;
@@ -315,6 +312,8 @@ public:
// Does not hold locks when it makes any function calls.
void pretty_print(std::ostream* s, const std::string& prefix = "") const;
+ void add_to_span();
+
// Serializes profile to thrift.
// Does not hold locks when it makes any function calls.
void to_thrift(TRuntimeProfileTree* tree);
@@ -446,6 +445,8 @@ private:
// of the total time in the entire profile tree.
double _local_time_percent;
+ bool _added_to_span {false};
+
enum PeriodicCounterType {
RATE_COUNTER = 0,
SAMPLING_COUNTER,
@@ -483,6 +484,18 @@ private:
static void print_child_counters(const std::string& prefix, const
std::string& counter_name,
const CounterMap& counter_map,
const ChildCounterMap& child_counter_map,
std::ostream* s);
+
+ static void add_child_counters_to_span(OpentelemetrySpan span, const
std::string& profile_name,
+ const std::string& counter_name,
+ const CounterMap& counter_map,
+ const ChildCounterMap&
child_counter_map);
+
+ static std::string print_json_counter(const std::string& profile_name,
Counter* counter) {
+ return print_json_info(profile_name,
+ PrettyPrinter::print(counter->value(),
counter->type()));
+ }
+
+ static std::string print_json_info(const std::string& profile_name,
std::string value);
};
// Utility class to update the counter at object construction and destruction.
diff --git a/be/src/util/telemetry/telemetry.cpp
b/be/src/util/telemetry/telemetry.cpp
index 5bbd7d4537..b6301862a4 100644
--- a/be/src/util/telemetry/telemetry.cpp
+++ b/be/src/util/telemetry/telemetry.cpp
@@ -17,16 +17,18 @@
#include "telemetry.h"
+#include <boost/algorithm/string/case_conv.hpp>
+
#include "common/config.h"
+#include "common/logging.h"
#include "opentelemetry/context/propagation/global_propagator.h"
#include "opentelemetry/context/propagation/text_map_propagator.h"
#include "opentelemetry/exporters/otlp/otlp_http_exporter.h"
#include "opentelemetry/exporters/zipkin/zipkin_exporter.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/sdk/trace/batch_span_processor.h"
-#include "opentelemetry/trace/noop.h"
+#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/propagation/http_trace_context.h"
-#include "opentelemetry/trace/provider.h"
#include "service/backend_options.h"
namespace trace = opentelemetry::trace;
diff --git a/be/src/util/telemetry/telemetry.h
b/be/src/util/telemetry/telemetry.h
index 96f8667901..9b6142dc95 100644
--- a/be/src/util/telemetry/telemetry.h
+++ b/be/src/util/telemetry/telemetry.h
@@ -17,11 +17,7 @@
#pragma once
-#include "opentelemetry/context/context.h"
-#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"
-#include "util/pretty_printer.h"
-#include "util/runtime_profile.h"
/// A trace represents the execution process of a single request in the
system, span represents a
/// logical operation unit with start time and execution duration in the
system, and multiple spans
@@ -87,14 +83,5 @@ inline bool is_current_span_valid() {
return
opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid();
}
-inline void set_span_attribute(OpentelemetrySpan& span,
RuntimeProfile::Counter* const counter) {
- span->SetAttribute(counter->name(), PrettyPrinter::print(counter->value(),
counter->type()));
-}
-
-inline void set_current_span_attribute(RuntimeProfile::Counter* const counter)
{
- opentelemetry::trace::Tracer::GetCurrentSpan()->SetAttribute(
- counter->name(), PrettyPrinter::print(counter->value(),
counter->type()));
-}
-
} // namespace telemetry
} // namespace doris
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 0b2e814e55..16c41043dc 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -384,7 +384,6 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
_add_blocks(blocks);
}
}
- telemetry::set_span_attribute(span, _scanner_sched_counter);
VLOG_CRITICAL << "TransferThread finish.";
_transfer_done = true;
@@ -397,8 +396,6 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
}
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
- START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
- "VOlapScanNode::scanner_thread");
SCOPED_ATTACH_TASK(_runtime_state);
Thread::set_self_name("volap_scanner");
int64_t wait_time = scanner->update_wait_worker_timer();
@@ -880,7 +877,6 @@ Status VOlapScanNode::start_scan_thread(RuntimeState*
state) {
_transfer_done = true;
return Status::OK();
}
- auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
// ranges constructed from scan keys
std::vector<std::unique_ptr<OlapScanRange>> cond_ranges;
@@ -955,8 +951,6 @@ Status VOlapScanNode::start_scan_thread(RuntimeState*
state) {
}
COUNTER_SET(_num_disks_accessed_counter,
static_cast<int64_t>(disk_set.size()));
COUNTER_SET(_num_scanners, static_cast<int64_t>(_volap_scanners.size()));
- telemetry::set_span_attribute(span, _num_disks_accessed_counter);
- telemetry::set_span_attribute(span, _num_scanners);
// init progress
std::stringstream ss;
@@ -1188,7 +1182,6 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
}
// post volap scanners to thread-pool
- auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
ThreadPoolToken* thread_token = nullptr;
if (_limit > -1 && _limit < 1024) {
thread_token = state->get_query_fragments_ctx()->get_serial_token();
@@ -1198,10 +1191,8 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
auto iter = olap_scanners.begin();
if (thread_token != nullptr) {
while (iter != olap_scanners.end()) {
- auto s = thread_token->submit_func([this, scanner = *iter,
parent_span = cur_span] {
- opentelemetry::trace::Scope scope {parent_span};
- this->scanner_thread(scanner);
- });
+ auto s = thread_token->submit_func(
+ [this, scanner = *iter] { this->scanner_thread(scanner);
});
if (s.ok()) {
(*iter)->start_wait_worker_timer();
COUNTER_UPDATE(_scanner_sched_counter, 1);
@@ -1216,10 +1207,7 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
PriorityThreadPool* remote_thread_pool =
state->exec_env()->remote_scan_thread_pool();
while (iter != olap_scanners.end()) {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner = *iter, parent_span =
cur_span] {
- opentelemetry::trace::Scope scope {parent_span};
- this->scanner_thread(scanner);
- };
+ task.work_function = [this, scanner = *iter] {
this->scanner_thread(scanner); };
task.priority = _nice;
task.queue_id =
state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();
@@ -1264,7 +1252,6 @@ Status VOlapScanNode::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_
_scan_ranges.emplace_back(new
TPaloScanRange(scan_range.scan_range.palo_scan_range));
COUNTER_UPDATE(_tablet_counter, 1);
}
- telemetry::set_current_span_attribute(_tablet_counter);
return Status::OK();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index e2f85d2ac3..63a7d9fcb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -87,6 +87,9 @@ public class AuditEvent {
@AuditField(value = "SqlDigest")
public String sqlDigest = "";
+ @AuditField(value = "TraceId")
+ public String traceId = "";
+
public static class AuditEventBuilder {
private AuditEvent auditEvent = new AuditEvent();
@@ -193,6 +196,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setTraceId(String traceId) {
+ auditEvent.traceId = traceId;
+ return this;
+ }
+
public AuditEvent build() {
return this.auditEvent;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index ad7d8fe5e8..aca4527f1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -60,6 +60,8 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
@@ -149,6 +151,7 @@ public class ConnectProcessor {
// slow query
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();
+ SpanContext spanContext =
Span.fromContext(Context.current()).getSpanContext();
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setState(ctx.getState().toString()).setQueryTime(elapseMs)
@@ -158,7 +161,8 @@ public class ConnectProcessor {
.setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
- .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()));
+ .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
+ .setTraceId(spanContext.isValid() ? spanContext.getTraceId() :
"");
if (ctx.getState().isQuery()) {
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
@@ -299,6 +303,9 @@ public class ConnectProcessor {
// auditInfoList can be empty if we encounter analysis error.
auditAfterExec(originStmt.replace("\n", " "), null, null);
}
+ if (executor != null) {
+ executor.addProfileToSpan();
+ }
}
// analyze the origin stmt and return multi-statements
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 744b2880bd..c77da6d703 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -121,6 +121,7 @@ import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -214,8 +215,6 @@ public class StmtExecutor implements ProfileWriter {
// At the end of query execution, we begin to add up profile
private void initProfile(QueryPlannerProfile plannerProfile, boolean
waiteBeReport) {
- long currentTimestamp = System.currentTimeMillis();
- long totalTimeMs = currentTimestamp - context.getStartTime();
RuntimeProfile queryProfile;
// when a query hits the sql cache, `coord` is null.
if (coord == null) {
@@ -227,32 +226,17 @@ public class StmtExecutor implements ProfileWriter {
profile = new RuntimeProfile("Query");
summaryProfile = new RuntimeProfile("Summary");
profile.addChild(summaryProfile);
- summaryProfile.addInfoString(ProfileManager.QUERY_ID,
DebugUtil.printId(context.queryId()));
summaryProfile.addInfoString(ProfileManager.START_TIME,
TimeUtils.longToTimeString(context.getStartTime()));
- summaryProfile.addInfoString(ProfileManager.END_TIME,
- waiteBeReport ?
TimeUtils.longToTimeString(currentTimestamp) : "N/A");
- summaryProfile.addInfoString(ProfileManager.TOTAL_TIME,
DebugUtil.getPrettyStringMs(totalTimeMs));
- summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, queryType);
- summaryProfile.addInfoString(ProfileManager.QUERY_STATE,
- !waiteBeReport &&
context.getState().getStateType().equals(MysqlStateType.OK)
- ? "RUNNING" : context.getState().toString());
- summaryProfile.addInfoString(ProfileManager.DORIS_VERSION,
Version.DORIS_BUILD_VERSION);
- summaryProfile.addInfoString(ProfileManager.USER,
context.getQualifiedUser());
- summaryProfile.addInfoString(ProfileManager.DEFAULT_DB,
context.getDatabase());
- summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT,
originStmt.originStmt);
- summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ?
"Yes" : "No");
-
+ updateSummaryProfile(waiteBeReport);
+ for (Map.Entry<String, String> entry :
getSummaryInfo().entrySet()) {
+ summaryProfile.addInfoString(entry.getKey(), entry.getValue());
+ }
summaryProfile.addInfoString(ProfileManager.TRACE_ID,
context.getSessionVariable().getTraceId());
plannerRuntimeProfile = new RuntimeProfile("Execution Summary");
summaryProfile.addChild(plannerRuntimeProfile);
profile.addChild(queryProfile);
} else {
- summaryProfile.addInfoString(ProfileManager.END_TIME,
- waiteBeReport ?
TimeUtils.longToTimeString(currentTimestamp) : "N/A");
- summaryProfile.addInfoString(ProfileManager.TOTAL_TIME,
DebugUtil.getPrettyStringMs(totalTimeMs));
- summaryProfile.addInfoString(ProfileManager.QUERY_STATE,
- !waiteBeReport &&
context.getState().getStateType().equals(MysqlStateType.OK)
- ? "RUNNING" : context.getState().toString());
+ updateSummaryProfile(waiteBeReport);
}
plannerProfile.initRuntimeProfile(plannerRuntimeProfile);
@@ -262,6 +246,40 @@ public class StmtExecutor implements ProfileWriter {
}
}
+ private void updateSummaryProfile(boolean waiteBeReport) {
+ Preconditions.checkNotNull(summaryProfile);
+ long currentTimestamp = System.currentTimeMillis();
+ long totalTimeMs = currentTimestamp - context.getStartTime();
+ summaryProfile.addInfoString(ProfileManager.END_TIME,
+ waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) :
"N/A");
+ summaryProfile.addInfoString(ProfileManager.TOTAL_TIME,
DebugUtil.getPrettyStringMs(totalTimeMs));
+ summaryProfile.addInfoString(ProfileManager.QUERY_STATE,
+ !waiteBeReport &&
context.getState().getStateType().equals(MysqlStateType.OK) ? "RUNNING" :
+ context.getState().toString());
+ }
+
+ private Map<String, String> getSummaryInfo() {
+ Map<String, String> infos = Maps.newLinkedHashMap();
+ infos.put(ProfileManager.QUERY_ID,
DebugUtil.printId(context.queryId()));
+ infos.put(ProfileManager.QUERY_TYPE, queryType);
+ infos.put(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
+ infos.put(ProfileManager.USER, context.getQualifiedUser());
+ infos.put(ProfileManager.DEFAULT_DB, context.getDatabase());
+ infos.put(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
+ infos.put(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");
+ return infos;
+ }
+
+ public void addProfileToSpan() {
+ Span span = Span.fromContext(Context.current());
+ if (!span.isRecording()) {
+ return;
+ }
+ for (Map.Entry<String, String> entry : getSummaryInfo().entrySet()) {
+ span.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+
public Planner planner() {
return planner;
}
@@ -331,10 +349,6 @@ public class StmtExecutor implements ProfileWriter {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
Span executeSpan =
context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
- executeSpan.setAttribute("queryId", DebugUtil.printId(queryId));
- if (originStmt != null) {
- executeSpan.setAttribute("sql", originStmt.originStmt);
- }
try (Scope scope = executeSpan.makeCurrent()) {
execute(queryId);
} finally {
@@ -349,7 +363,6 @@ public class StmtExecutor implements ProfileWriter {
// Exception:
// IOException: talk with client failed.
public void execute(TUniqueId queryId) throws Exception {
- Span span = Span.fromContext(Context.current());
context.setStartTime();
plannerProfile.setQueryBeginTime();
@@ -436,7 +449,6 @@ public class StmtExecutor implements ProfileWriter {
AuditLog.getQueryAudit().log("Query {} {} times
with new query id: {}",
DebugUtil.printId(queryId), i,
DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
- span.setAttribute("queryId",
DebugUtil.printId(newQueryId));
}
handleQueryStmt();
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]