This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6c988e73ebf [Feature](profile)add shuffle send rows/bytes #30456
6c988e73ebf is described below
commit 6c988e73ebf4ec2aac558a3cffc76f36baed06ad
Author: wangbo <[email protected]>
AuthorDate: Sat Jan 27 17:09:17 2024 +0800
[Feature](profile)add shuffle send rows/bytes #30456
---
be/src/exec/data_sink.cpp | 16 ++++++++++++++++
be/src/exec/data_sink.h | 7 ++++++-
be/src/exec/exec_node.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_buffer.h | 1 -
be/src/pipeline/pipeline_x/operator.cpp | 4 +++-
be/src/pipeline/pipeline_x/operator.h | 6 +++++-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 5 ++++-
be/src/runtime/query_statistics.cpp | 11 +++++++----
be/src/runtime/query_statistics.h | 19 +++++++++++++++++--
be/src/vec/sink/vdata_stream_sender.cpp | 3 +++
.../org/apache/doris/plugin/audit/AuditEvent.java | 4 ++++
.../workloadschedpolicy/WorkloadRuntimeStatusMgr.java | 5 +++++
.../ActiveQueriesTableValuedFunction.java | 2 ++
.../apache/doris/tablefunction/MetadataGenerator.java | 4 ++++
gensrc/thrift/FrontendService.thrift | 2 ++
15 files changed, 79 insertions(+), 12 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c58bbdb2523..5809912f4a2 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -30,6 +30,8 @@
#include <string>
#include "common/config.h"
+#include "runtime/query_context.h"
+#include "runtime/query_statistics.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
@@ -44,6 +46,14 @@ namespace doris {
class DescriptorTbl;
class TExpr;
+DataSink::DataSink(const RowDescriptor& desc) : _row_desc(desc) {
+ _query_statistics = std::make_shared<QueryStatistics>();
+}
+
+std::shared_ptr<QueryStatistics> DataSink::get_query_statistics_ptr() {
+ return _query_statistics;
+}
+
Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink&
thrift_sink,
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
@@ -175,6 +185,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (*sink != nullptr) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
+ if (state->get_query_ctx()) {
+
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
+ }
}
return Status::OK();
@@ -318,6 +331,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (*sink != nullptr) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
RETURN_IF_ERROR((*sink)->prepare(state));
+ if (state->get_query_ctx()) {
+
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
+ }
}
return Status::OK();
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index c0b27e8ae90..e08d40ab023 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -40,6 +40,7 @@ class TDataSink;
class TExpr;
class TPipelineFragmentParams;
class TOlapTableSink;
+class QueryStatistics;
namespace vectorized {
class Block;
@@ -48,7 +49,7 @@ class Block;
// Superclass of all data sinks.
class DataSink {
public:
- DataSink(const RowDescriptor& desc) : _row_desc(desc) {}
+ DataSink(const RowDescriptor& desc);
virtual ~DataSink() {}
virtual Status init(const TDataSink& thrift_sink);
@@ -103,6 +104,8 @@ public:
virtual bool can_write() { return true; }
+ std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
+
private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
@@ -124,6 +127,8 @@ protected:
_output_rows_counter = ADD_COUNTER_WITH_LEVEL(_profile,
"RowsProduced", TUnit::UNIT, 1);
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile,
"BlocksProduced", TUnit::UNIT, 1);
}
+
+ std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
} // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ac6e7eae9a0..368e94562a2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -255,7 +255,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state,
ObjectPool* pool,
// Step 1 Create current ExecNode according to current thrift plan node.
ExecNode* cur_exec_node = nullptr;
RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs,
&cur_exec_node));
- if (cur_exec_node != nullptr) {
+ if (cur_exec_node != nullptr && state->get_query_ctx()) {
state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index f0b55d528ec..c04d2973d5e 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -273,7 +273,6 @@ private:
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<Dependency> _finish_dependency;
- QueryStatistics* _statistics = nullptr;
std::atomic<bool> _should_stop {false};
};
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index db687865657..ef17ee70ed6 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -342,7 +342,9 @@ Status
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : _parent(parent), _state(state) {}
+ : _parent(parent), _state(state) {
+ _query_statistics = std::make_shared<QueryStatistics>();
+}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
: _num_rows_returned(0),
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 1c076bd5a69..61afe40e126 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -111,7 +111,7 @@ public:
// override in Scan MultiCastSink
virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
- std::shared_ptr<QueryStatistics> query_statistics_ptr() { return
_query_statistics; }
+ std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return
_query_statistics; }
protected:
friend class OperatorXBase;
@@ -424,6 +424,8 @@ public:
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
+ std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return
_query_statistics; }
+
protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
@@ -447,6 +449,8 @@ protected:
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
+
+ std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
class DataSinkOperatorXBase : public OperatorBase {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9a88c417be0..f0b87ce8613 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -92,6 +92,9 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams&
local_params, const
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(),
no_scan_ranges);
auto* parent_profile =
_state->get_sink_local_state(_sink->operator_id())->profile();
+ query_ctx->register_query_statistics(
+
_state->get_sink_local_state(_sink->operator_id())->get_query_statistics_ptr());
+
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
@@ -105,7 +108,7 @@ Status PipelineXTask::prepare(const
TPipelineInstanceParams& local_params, const
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
query_ctx->register_query_statistics(
-
_state->get_local_state(op->operator_id())->query_statistics_ptr());
+
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
}
_block = doris::vectorized::Block::create_unique();
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index ab49b02ad43..e12e4e08c52 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -40,10 +40,11 @@ void NodeStatistics::from_pb(const PNodeStatistics&
node_statistics) {
}
void QueryStatistics::merge(const QueryStatistics& other) {
- scan_rows += other.scan_rows;
- scan_bytes += other.scan_bytes;
- int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed);
- cpu_nanos += other_cpu_time;
+ scan_rows += other.scan_rows.load(std::memory_order_relaxed);
+ scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
+ cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
+ shuffle_send_bytes +=
other.shuffle_send_bytes.load(std::memory_order_relaxed);
+ shuffle_send_rows +=
other.shuffle_send_rows.load(std::memory_order_relaxed);
int64_t other_peak_mem =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -85,6 +86,8 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics)
const {
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_current_used_memory_bytes(
current_used_memory_bytes.load(std::memory_order_relaxed));
+
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
+
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index abaf0a251a8..8a18a152e4f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -65,7 +65,9 @@ public:
cpu_nanos(0),
returned_rows(0),
max_peak_memory_bytes(0),
- current_used_memory_bytes(0) {}
+ current_used_memory_bytes(0),
+ shuffle_send_bytes(0),
+ shuffle_send_rows(0) {}
virtual ~QueryStatistics();
void merge(const QueryStatistics& other);
@@ -82,6 +84,14 @@ public:
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}
+ void add_shuffle_send_bytes(int64_t delta_bytes) {
+ this->shuffle_send_bytes.fetch_add(delta_bytes,
std::memory_order_relaxed);
+ }
+
+ void add_shuffle_send_rows(int64_t delta_rows) {
+ this->shuffle_send_rows.fetch_add(delta_rows,
std::memory_order_relaxed);
+ }
+
NodeStatistics* add_nodes_statistics(int64_t node_id) {
NodeStatistics* nodeStatistics = nullptr;
auto iter = _nodes_statistics_map.find(node_id);
@@ -115,8 +125,10 @@ public:
void clear() {
scan_rows.store(0, std::memory_order_relaxed);
scan_bytes.store(0, std::memory_order_relaxed);
-
cpu_nanos.store(0, std::memory_order_relaxed);
+ shuffle_send_bytes.store(0, std::memory_order_relaxed);
+ shuffle_send_rows.store(0, std::memory_order_relaxed);
+
returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
clearNodeStatistics();
@@ -152,6 +164,9 @@ private:
NodeStatisticsMap _nodes_statistics_map;
bool _collected = false;
std::atomic<int64_t> current_used_memory_bytes;
+
+ std::atomic<int64_t> shuffle_send_bytes;
+ std::atomic<int64_t> shuffle_send_rows;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 79ffebcc21e..35900964274 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -765,6 +765,9 @@ Status BlockSerializer<Parent>::serialize_block(const
Block* src, PBlock* dest,
COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter,
uncompressed_bytes * num_receivers);
COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
+
num_receivers);
+ _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows()
* num_receivers);
}
return Status::OK();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index 5c122c98965..c8915966725 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -88,6 +88,10 @@ public class AuditEvent {
public String stmt = "";
@AuditField(value = "CpuTimeMS")
public long cpuTimeMs = -1;
+ @AuditField(value = "ShuffleSendBytes")
+ public long shuffleSendBytes = -1;
+ @AuditField(value = "ShuffleSendRows")
+ public long shuffleSendRows = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
@AuditField(value = "peakMemoryBytes")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 3c5d7fc8bf1..7bec1d1954a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -70,6 +70,8 @@ public class WorkloadRuntimeStatusMgr {
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.peakMemoryBytes =
queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
+ auditEvent.shuffleSendBytes =
queryStats.shuffle_send_bytes;
+ auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
}
@@ -139,6 +141,7 @@ public class WorkloadRuntimeStatusMgr {
} else {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, TQueryStatistics> entry :
params.query_statistics_map.entrySet()) {
+ LOG.info("log2109 queryid={}, shuffle={}", entry.getKey(),
entry.getValue().shuffle_send_bytes);
queryIdMap.put(entry.getKey(), entry.getValue());
queryLastReportTime.put(entry.getKey(), currentTime);
}
@@ -175,6 +178,8 @@ public class WorkloadRuntimeStatusMgr {
dst.scan_rows += src.scan_rows;
dst.scan_bytes += src.scan_bytes;
dst.cpu_ms += src.cpu_ms;
+ dst.shuffle_send_bytes += src.shuffle_send_bytes;
+ dst.shuffle_send_rows += src.shuffle_send_rows;
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index 0839ae56a67..27f65ed7680 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -46,6 +46,8 @@ public class ActiveQueriesTableValuedFunction extends
MetadataTableValuedFunctio
new Column("ScanBytes", PrimitiveType.BIGINT),
new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
+ new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
+ new Column("ShuffleSendRows", PrimitiveType.BIGINT),
new Column("Database", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()),
new Column("Sql", ScalarType.createStringType()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e4768660698..21505586491 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -441,6 +441,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
trow.addToColumnValue(new
TCell().setLongVal(qs.max_peak_memory_bytes));
trow.addToColumnValue(new
TCell().setLongVal(qs.current_used_memory_bytes));
+ trow.addToColumnValue(new
TCell().setLongVal(qs.shuffle_send_bytes));
+ trow.addToColumnValue(new
TCell().setLongVal(qs.shuffle_send_rows));
} else {
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
@@ -448,6 +450,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
}
if (queryInfo.getConnectContext() != null) {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 7b65103c581..d96ee346148 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -406,6 +406,8 @@ struct TQueryStatistics {
5: optional i64 max_peak_memory_bytes
6: optional i64 current_used_memory_bytes
7: optional i64 workload_group_id
+ 8: optional i64 shuffle_send_bytes
+ 9: optional i64 shuffle_send_rows
}
struct TReportWorkloadRuntimeStatusParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]