This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6b2eed779c9 [feature](AuditLog) add scanRows scanBytes in auditlog
(#25435)
6b2eed779c9 is described below
commit 6b2eed779c9bc3df551e61f5efeb6f663ad38d36
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 25 10:00:35 2023 +0800
[feature](AuditLog) add scanRows scanBytes in auditlog (#25435)
---
be/src/exec/exec_node.cpp | 8 +++++++
be/src/exec/exec_node.h | 2 ++
be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 +++++++
be/src/pipeline/exec/exchange_sink_buffer.h | 3 +++
be/src/pipeline/exec/exchange_sink_operator.cpp | 1 +
be/src/pipeline/exec/operator.h | 21 ++++++++++++++++
be/src/pipeline/pipeline.h | 10 ++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 2 ++
be/src/pipeline/pipeline_task.cpp | 25 +++++++++++++++++++
be/src/pipeline/pipeline_task.h | 3 +++
be/src/runtime/query_statistics.cpp | 32 +++++++++++++------------
be/src/runtime/query_statistics.h | 20 ++++++++++++----
be/src/vec/exec/scan/new_olap_scan_node.cpp | 9 +++++--
be/src/vec/exec/scan/new_olap_scan_node.h | 1 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 3 ++-
be/src/vec/exec/scan/vscan_node.cpp | 3 ++-
be/src/vec/exec/scan/vscan_node.h | 1 +
be/src/vec/exec/scan/vscanner.cpp | 2 ++
be/src/vec/exec/scan/vscanner.h | 2 ++
be/src/vec/exec/vexchange_node.cpp | 6 ++++-
be/src/vec/exec/vexchange_node.h | 1 +
be/src/vec/runtime/vdata_stream_recvr.cpp | 11 +++++++++
be/src/vec/runtime/vdata_stream_recvr.h | 3 +++
be/src/vec/sink/vdata_stream_sender.cpp | 15 ++++++++++--
be/src/vec/sink/vdata_stream_sender.h | 1 +
25 files changed, 167 insertions(+), 26 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4c6a734347e..ed8b11457a6 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -187,6 +187,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics*
statistics) {
return Status::OK();
}
+Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
+ DCHECK(statistics != nullptr);
+ for (auto child_node : _children) {
+ RETURN_IF_ERROR(child_node->collect_query_statistics(statistics,
sender_id));
+ }
+ return Status::OK();
+}
+
void ExecNode::release_resource(doris::RuntimeState* state) {
if (!_is_resource_released) {
if (_rows_returned_counter != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 8aeecc2b8ca..d4d90546ffb 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -162,6 +162,8 @@ public:
// error.
[[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics);
+ [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics,
+ int sender_id);
// close() will get called for every exec node, regardless of what else is
called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index b3d2222e503..577eb4a4622 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -237,6 +237,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ if (_statistics && _statistics->collected()) {
+ auto statistic = brpc_request->mutable_query_statistics();
+ _statistics->to_pb(statistic);
+ }
if (request.block) {
brpc_request->set_allocated_block(request.block.get());
}
@@ -297,6 +301,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
if (request.block_holder->get_block()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
+ if (_statistics && _statistics->collected()) {
+ auto statistic = brpc_request->mutable_query_statistics();
+ _statistics->to_pb(statistic);
+ }
auto* closure = request.channel->get_closure(id, request.eos,
request.block_holder);
ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c47d6c6a14b..d63dd2a55f9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -33,6 +33,7 @@
#include "common/global_types.h"
#include "common/status.h"
+#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
@@ -188,6 +189,7 @@ public:
_queue_dependency = queue_dependency;
_finish_dependency = finish_dependency;
}
+ void set_query_statistics(QueryStatistics* statistics) { _statistics =
statistics; }
private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
@@ -237,6 +239,7 @@ private:
int _queue_capacity = 0;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
+ QueryStatistics* _statistics = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 89679ceff89..047e336a1e2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -66,6 +66,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
id, _dest_node_id, _sink->_sender_id, _state->be_number(),
state->get_query_ctx());
+ _sink_buffer->set_query_statistics(_sink->query_statistics());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 125f8fd89e4..5362be88e89 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -174,6 +174,14 @@ public:
virtual bool is_source() const;
+ virtual Status collect_query_statistics(QueryStatistics* statistics) {
return Status::OK(); };
+
+ virtual Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
+ return Status::OK();
+ };
+
+ virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
+
virtual Status init(const TDataSink& tsink) { return Status::OK(); }
// Prepare for running. (e.g. resource allocation, etc.)
@@ -318,6 +326,9 @@ public:
Status finalize(RuntimeState* state) override { return Status::OK(); }
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
return _sink->profile(); }
+ void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override {
+ _sink->set_query_statistics(statistics);
+ }
protected:
NodeType* _sink;
@@ -388,6 +399,16 @@ public:
return _node->runtime_profile();
}
+ Status collect_query_statistics(QueryStatistics* statistics) override {
+ RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
+ return Status::OK();
+ }
+
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override {
+ RETURN_IF_ERROR(_node->collect_query_statistics(statistics,
sender_id));
+ return Status::OK();
+ }
+
protected:
NodeType* _node;
bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index c67ef6d29e2..7dcffb410a2 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -117,6 +117,14 @@ public:
}
[[nodiscard]] PipelineId id() const { return _pipeline_id; }
+ void set_is_root_pipeline() { _is_root_pipeline = true; }
+ bool is_root_pipeline() const { return _is_root_pipeline; }
+ void set_collect_query_statistics_with_every_batch() {
+ _collect_query_statistics_with_every_batch = true;
+ }
+ [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
+ return _collect_query_statistics_with_every_batch;
+ }
private:
void _init_profile();
@@ -168,6 +176,8 @@ private:
*/
bool _always_can_read = false;
bool _always_can_write = false;
+ bool _is_root_pipeline = false;
+ bool _collect_query_statistics_with_every_batch = false;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d7c6aebdc16..226b1f06951 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -320,6 +320,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
}
_root_pipeline = fragment_context->add_pipeline();
+ _root_pipeline->set_is_root_pipeline();
+ _root_pipeline->set_collect_query_statistics_with_every_batch();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index a46914fb601..7e907b318d2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -59,6 +59,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
index, RuntimeState*
_root(_operators.back()),
_sink(sink) {
_pipeline_task_watcher.start();
+ _query_statistics.reset(new QueryStatistics());
+ _sink->set_query_statistics(_query_statistics);
+ _collect_query_statistics_with_every_batch =
+ _pipeline->collect_query_statistics_with_every_batch();
}
PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index,
RuntimeState* state,
@@ -295,6 +299,10 @@ Status PipelineTask::execute(bool* eos) {
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
+ if (_data_state == SourceState::FINISHED ||
+ _collect_query_statistics_with_every_batch) {
+ RETURN_IF_ERROR(_collect_query_statistics());
+ }
auto status = _sink->sink(_state, block, _data_state);
if (UNLIKELY(!status.ok() || block->rows() == 0)) {
if (_fragment_context->is_group_commit()) {
@@ -331,6 +339,23 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}
+Status PipelineTask::_collect_query_statistics() {
+ // The execnode tree of a fragment will be split into multiple pipelines,
we only need to collect the root pipeline.
+ if (_pipeline->is_root_pipeline()) {
+ // If the current fragment has only one instance, we can collect all
of them;
+ // otherwise, we need to collect them based on the sender_id.
+ if (_state->num_per_fragment_instances() == 1) {
+ _query_statistics->clear();
+
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
+ } else {
+ _query_statistics->clear();
+
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
+
_state->per_fragment_instance_idx()));
+ }
+ }
+ return Status::OK();
+}
+
Status PipelineTask::try_close(Status exec_status) {
if (_try_close_flag) {
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 41f60c27653..99e41421bb3 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -368,6 +368,9 @@ protected:
int64_t _close_pipeline_time = 0;
RuntimeProfile::Counter* _pip_task_total_timer;
+ std::shared_ptr<QueryStatistics> _query_statistics;
+ Status _collect_query_statistics();
+ bool _collect_query_statistics_with_every_batch = false;
private:
Operators _operators; // left is _source, right is _root
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index a6754215ace..22c18faa1e6 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -20,6 +20,8 @@
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
+#include <memory>
+
namespace doris {
void NodeStatistics::merge(const NodeStatistics& other) {
@@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
}
+void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
+ auto it = recvr->_query_statistics.find(sender_id);
+ if (it != recvr->_query_statistics.end()) {
+ merge(*it->second);
+ }
+}
+
void QueryStatistics::clearNodeStatistics() {
for (auto& pair : _nodes_statistics_map) {
delete pair.second;
@@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() {
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int
sender_id) {
std::lock_guard<SpinLock> l(_lock);
- QueryStatistics* query_statistics = nullptr;
- auto iter = _query_statistics.find(sender_id);
- if (iter == _query_statistics.end()) {
- query_statistics = new QueryStatistics;
- _query_statistics[sender_id] = query_statistics;
- } else {
- query_statistics = iter->second;
+ if (!_query_statistics.contains(sender_id)) {
+ _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
}
- query_statistics->from_pb(statistics);
+ _query_statistics[sender_id]->from_pb(statistics);
}
-QueryStatisticsRecvr::~QueryStatisticsRecvr() {
- // It is unnecessary to lock here, because the destructor will be
- // called alter DataStreamRecvr's close in ExchangeNode.
- for (auto& pair : _query_statistics) {
- delete pair.second;
- }
- _query_statistics.clear();
+void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int
sender_id) {
+ if (!statistics->collected()) return;
+ if (_query_statistics.contains(sender_id)) return;
+ std::lock_guard<SpinLock> l(_lock);
+ _query_statistics[sender_id] = statistics;
}
} // namespace doris
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index c4ace8f23e2..42c1457472f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <map>
+#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
@@ -88,6 +89,7 @@ public:
void merge(QueryStatisticsRecvr* recvr);
+ void merge(QueryStatisticsRecvr* recvr, int sender_id);
// Get the maximum value from the peak memory collected by all node
statistics
int64_t calculate_max_peak_memory_bytes();
@@ -100,13 +102,18 @@ public:
returned_rows = 0;
max_peak_memory_bytes = 0;
clearNodeStatistics();
+ //clear() is used before collection, so calling "clear" is equivalent
to being collected.
+ set_collected();
}
void to_pb(PQueryStatistics* statistics);
void from_pb(const PQueryStatistics& statistics);
+ bool collected() const { return _collected; }
+ void set_collected() { _collected = true; }
private:
+ friend class QueryStatisticsRecvr;
int64_t scan_rows;
int64_t scan_bytes;
int64_t cpu_ms;
@@ -117,17 +124,22 @@ private:
// only set once by result sink when closing.
int64_t max_peak_memory_bytes;
// The statistics of the query on each backend.
- typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+ using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
+ bool _collected = false;
};
-
+using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
class QueryStatisticsRecvr {
public:
- ~QueryStatisticsRecvr();
+ ~QueryStatisticsRecvr() = default;
+ // Transmitted via RPC, incurring serialization overhead.
void insert(const PQueryStatistics& statistics, int sender_id);
+ // using local_exchange for transmission, only need to hold a shared
pointer.
+ void insert(QueryStatisticsPtr statistics, int sender_id);
+
private:
friend class QueryStatistics;
@@ -138,7 +150,7 @@ private:
}
}
- std::map<int, QueryStatistics*> _query_statistics;
+ std::map<int, QueryStatisticsPtr> _query_statistics;
SpinLock _lock;
};
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 128fb310392..3bbff6c20a6 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const
TPlanNode& tnode,
Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
if (!_is_pipeline_scan || _should_create_scanner) {
- statistics->add_scan_bytes(_read_compressed_counter->value());
- statistics->add_scan_rows(_raw_rows_counter->value());
+ statistics->add_scan_bytes(_byte_read_counter->value());
+ statistics->add_scan_rows(_rows_read_counter->value());
statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
}
return Status::OK();
}
+Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics,
int) {
+ RETURN_IF_ERROR(collect_query_statistics(statistics));
+ return Status::OK();
+}
+
Status NewOlapScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
// if you want to add some profile in scan node, even it have not new
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 0725c37cf5e..cbbf3072872 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -64,6 +64,7 @@ public:
Status prepare(RuntimeState* state) override;
Status collect_query_statistics(QueryStatistics* statistics) override;
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 26b5ead8156..eef7cf8271c 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -551,7 +551,8 @@ void NewOlapScanner::_update_realtime_counters() {
}
void NewOlapScanner::_update_counters_before_close() {
- if (!_state->enable_profile() || _has_updated_counter) {
+ // Please don't directly enable the profile here, we need to set
QueryStatistics using the counter inside.
+ if (_has_updated_counter) {
return;
}
_has_updated_counter = true;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 57dc6b66fd0..16ef362b96d 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -276,7 +276,8 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
Status VScanNode::_init_profile() {
// 1. counters for scan node
- _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead",
TUnit::UNIT);
+ _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead",
TUnit::UNIT);
+ _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead",
TUnit::BYTES);
_total_throughput_counter =
runtime_profile()->add_rate_counter("TotalReadThroughput",
_rows_read_counter);
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 2ecd6ac3202..458cb318b75 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -319,6 +319,7 @@ protected:
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter;
+ RuntimeProfile::Counter* _byte_read_counter;
// Wall based aggregate read throughput [rows/sec]
RuntimeProfile::Counter* _total_throughput_counter;
RuntimeProfile::Counter* _num_scanners;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 0a43b3dd3d8..3f900d1ca31 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -95,6 +95,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block,
bool* eof) {
break;
}
_num_rows_read += block->rows();
+ _num_byte_read += block->allocated_bytes();
}
// 2. Filter the output block finally.
@@ -191,6 +192,7 @@ void VScanner::_update_counters_before_close() {
if (_parent) {
COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
+ COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
} else {
COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 5359204758f..1abd20ba276 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -196,6 +196,8 @@ protected:
// num of rows read from scanner
int64_t _num_rows_read = 0;
+ int64_t _num_byte_read = 0;
+
// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index 7eb98588892..4b2e4f76d03 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -150,7 +150,11 @@ Status
VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
statistics->merge(_sub_plan_query_statistics_recvr.get());
return Status::OK();
}
-
+Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics,
int sender_id) {
+ RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
+ statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
+ return Status::OK();
+}
Status VExchangeNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index c4f083dda48..94302e84d9b 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -56,6 +56,7 @@ public:
Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
void release_resource(RuntimeState* state) override;
Status collect_query_statistics(QueryStatistics* statistics) override;
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
Status close(RuntimeState* state) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a6af6c634da..17f39756459 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -458,6 +458,17 @@ void VDataStreamRecvr::remove_sender(int sender_id, int
be_number, Status exec_s
_sender_queues[use_sender_id]->decrement_senders(be_number);
}
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number,
QueryStatisticsPtr statistics,
+ Status exec_status) {
+ if (!exec_status.ok()) {
+ cancel_stream(exec_status);
+ return;
+ }
+ int use_sender_id = _is_merging ? sender_id : 0;
+ _sender_queues[use_sender_id]->decrement_senders(be_number);
+ _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
+}
+
void VDataStreamRecvr::cancel_stream(Status exec_status) {
VLOG_QUERY << "cancel_stream: fragment_instance_id=" <<
print_id(_fragment_instance_id)
<< exec_status;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index d8ab873f87a..b2fd6afdc7f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -108,6 +108,9 @@ public:
// sender queue. Called from DataStreamMgr.
void remove_sender(int sender_id, int be_number, Status exec_status);
+ void remove_sender(int sender_id, int be_number, QueryStatisticsPtr
statistics,
+ Status exec_status);
+
void cancel_stream(Status exec_status);
void close();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 4f9816db0b4..bff65bd897e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -140,7 +140,13 @@ Status Channel<Parent>::send_local_block(Status
exec_status, bool eos) {
_local_recvr->add_block(&block, _parent->sender_id(), true);
if (eos) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
+ /// TODO: Supported on pipelineX, we can hold QueryStatistics on
the fragment instead of on instances.
+ if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
+ _local_recvr->remove_sender(_parent->sender_id(), _be_number,
+ _parent->query_statisticsPtr(),
exec_status);
+ } else {
+ _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
+ }
}
return Status::OK();
} else {
@@ -273,7 +279,12 @@ Status Channel<Parent>::close_internal(Status exec_status)
{
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (is_local()) {
if (_recvr_is_valid()) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
+ if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
+ _local_recvr->remove_sender(_parent->sender_id(),
_be_number,
+
_parent->query_statisticsPtr(), exec_status);
+ } else {
+ _local_recvr->remove_sender(_parent->sender_id(),
_be_number, exec_status);
+ }
}
} else {
status = send_remote_block((PBlock*)nullptr, true, exec_status);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 203d59dc664..35f20d1b821 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -139,6 +139,7 @@ public:
}
MemTracker* mem_tracker() { return _mem_tracker.get(); }
QueryStatistics* query_statistics() { return _query_statistics.get(); }
+ QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; }
RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
segment_v2::CompressionTypePB& compression_type() { return
_compression_type; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]