This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b2eed779c9 [feature](AuditLog) add scanRows scanBytes in auditlog 
(#25435)
6b2eed779c9 is described below

commit 6b2eed779c9bc3df551e61f5efeb6f663ad38d36
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 25 10:00:35 2023 +0800

    [feature](AuditLog) add scanRows scanBytes in auditlog (#25435)
---
 be/src/exec/exec_node.cpp                       |  8 +++++++
 be/src/exec/exec_node.h                         |  2 ++
 be/src/pipeline/exec/exchange_sink_buffer.cpp   |  8 +++++++
 be/src/pipeline/exec/exchange_sink_buffer.h     |  3 +++
 be/src/pipeline/exec/exchange_sink_operator.cpp |  1 +
 be/src/pipeline/exec/operator.h                 | 21 ++++++++++++++++
 be/src/pipeline/pipeline.h                      | 10 ++++++++
 be/src/pipeline/pipeline_fragment_context.cpp   |  2 ++
 be/src/pipeline/pipeline_task.cpp               | 25 +++++++++++++++++++
 be/src/pipeline/pipeline_task.h                 |  3 +++
 be/src/runtime/query_statistics.cpp             | 32 +++++++++++++------------
 be/src/runtime/query_statistics.h               | 20 ++++++++++++----
 be/src/vec/exec/scan/new_olap_scan_node.cpp     |  9 +++++--
 be/src/vec/exec/scan/new_olap_scan_node.h       |  1 +
 be/src/vec/exec/scan/new_olap_scanner.cpp       |  3 ++-
 be/src/vec/exec/scan/vscan_node.cpp             |  3 ++-
 be/src/vec/exec/scan/vscan_node.h               |  1 +
 be/src/vec/exec/scan/vscanner.cpp               |  2 ++
 be/src/vec/exec/scan/vscanner.h                 |  2 ++
 be/src/vec/exec/vexchange_node.cpp              |  6 ++++-
 be/src/vec/exec/vexchange_node.h                |  1 +
 be/src/vec/runtime/vdata_stream_recvr.cpp       | 11 +++++++++
 be/src/vec/runtime/vdata_stream_recvr.h         |  3 +++
 be/src/vec/sink/vdata_stream_sender.cpp         | 15 ++++++++++--
 be/src/vec/sink/vdata_stream_sender.h           |  1 +
 25 files changed, 167 insertions(+), 26 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4c6a734347e..ed8b11457a6 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -187,6 +187,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics* 
statistics) {
     return Status::OK();
 }
 
+Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
+    DCHECK(statistics != nullptr);
+    for (auto child_node : _children) {
+        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, 
sender_id));
+    }
+    return Status::OK();
+}
+
 void ExecNode::release_resource(doris::RuntimeState* state) {
     if (!_is_resource_released) {
         if (_rows_returned_counter != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 8aeecc2b8ca..d4d90546ffb 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -162,6 +162,8 @@ public:
     // error.
     [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics);
 
+    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics,
+                                                          int sender_id);
     // close() will get called for every exec node, regardless of what else is 
called and
     // the status of these calls (i.e. prepare() may never have been called, or
     // prepare()/open()/get_next() returned with an error).
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index b3d2222e503..577eb4a4622 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -237,6 +237,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        if (_statistics && _statistics->collected()) {
+            auto statistic = brpc_request->mutable_query_statistics();
+            _statistics->to_pb(statistic);
+        }
         if (request.block) {
             brpc_request->set_allocated_block(request.block.get());
         }
@@ -297,6 +301,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (request.block_holder->get_block()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
+        if (_statistics && _statistics->collected()) {
+            auto statistic = brpc_request->mutable_query_statistics();
+            _statistics->to_pb(statistic);
+        }
         auto* closure = request.channel->get_closure(id, request.eos, 
request.block_holder);
 
         ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c47d6c6a14b..d63dd2a55f9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -33,6 +33,7 @@
 
 #include "common/global_types.h"
 #include "common/status.h"
+#include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "service/backend_options.h"
 
@@ -188,6 +189,7 @@ public:
         _queue_dependency = queue_dependency;
         _finish_dependency = finish_dependency;
     }
+    void set_query_statistics(QueryStatistics* statistics) { _statistics = 
statistics; }
 
 private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
@@ -237,6 +239,7 @@ private:
     int _queue_capacity = 0;
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
     std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
+    QueryStatistics* _statistics = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 89679ceff89..047e336a1e2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -66,6 +66,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
             id, _dest_node_id, _sink->_sender_id, _state->be_number(), 
state->get_query_ctx());
 
+    _sink_buffer->set_query_statistics(_sink->query_statistics());
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->registe_channels(_sink_buffer.get());
     return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 125f8fd89e4..5362be88e89 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -174,6 +174,14 @@ public:
 
     virtual bool is_source() const;
 
+    virtual Status collect_query_statistics(QueryStatistics* statistics) { 
return Status::OK(); };
+
+    virtual Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
+        return Status::OK();
+    };
+
+    virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
+
     virtual Status init(const TDataSink& tsink) { return Status::OK(); }
 
     // Prepare for running. (e.g. resource allocation, etc.)
@@ -318,6 +326,9 @@ public:
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override {
+        _sink->set_query_statistics(statistics);
+    }
 
 protected:
     NodeType* _sink;
@@ -388,6 +399,16 @@ public:
         return _node->runtime_profile();
     }
 
+    Status collect_query_statistics(QueryStatistics* statistics) override {
+        RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
+        return Status::OK();
+    }
+
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override {
+        RETURN_IF_ERROR(_node->collect_query_statistics(statistics, 
sender_id));
+        return Status::OK();
+    }
+
 protected:
     NodeType* _node;
     bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index c67ef6d29e2..7dcffb410a2 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -117,6 +117,14 @@ public:
     }
 
     [[nodiscard]] PipelineId id() const { return _pipeline_id; }
+    void set_is_root_pipeline() { _is_root_pipeline = true; }
+    bool is_root_pipeline() const { return _is_root_pipeline; }
+    void set_collect_query_statistics_with_every_batch() {
+        _collect_query_statistics_with_every_batch = true;
+    }
+    [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
+        return _collect_query_statistics_with_every_batch;
+    }
 
 private:
     void _init_profile();
@@ -168,6 +176,8 @@ private:
      */
     bool _always_can_read = false;
     bool _always_can_write = false;
+    bool _is_root_pipeline = false;
+    bool _collect_query_statistics_with_every_batch = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d7c6aebdc16..226b1f06951 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -320,6 +320,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     }
 
     _root_pipeline = fragment_context->add_pipeline();
+    _root_pipeline->set_is_root_pipeline();
+    _root_pipeline->set_collect_query_statistics_with_every_batch();
     RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
     if (_sink) {
         RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index a46914fb601..7e907b318d2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -59,6 +59,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _root(_operators.back()),
           _sink(sink) {
     _pipeline_task_watcher.start();
+    _query_statistics.reset(new QueryStatistics());
+    _sink->set_query_statistics(_query_statistics);
+    _collect_query_statistics_with_every_batch =
+            _pipeline->collect_query_statistics_with_every_batch();
 }
 
 PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, 
RuntimeState* state,
@@ -295,6 +299,10 @@ Status PipelineTask::execute(bool* eos) {
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
+            if (_data_state == SourceState::FINISHED ||
+                _collect_query_statistics_with_every_batch) {
+                RETURN_IF_ERROR(_collect_query_statistics());
+            }
             auto status = _sink->sink(_state, block, _data_state);
             if (UNLIKELY(!status.ok() || block->rows() == 0)) {
                 if (_fragment_context->is_group_commit()) {
@@ -331,6 +339,23 @@ Status PipelineTask::finalize() {
     return _sink->finalize(_state);
 }
 
+Status PipelineTask::_collect_query_statistics() {
+    // The execnode tree of a fragment will be split into multiple pipelines, 
we only need to collect the root pipeline.
+    if (_pipeline->is_root_pipeline()) {
+        // If the current fragment has only one instance, we can collect all 
of them;
+        // otherwise, we need to collect them based on the sender_id.
+        if (_state->num_per_fragment_instances() == 1) {
+            _query_statistics->clear();
+            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
+        } else {
+            _query_statistics->clear();
+            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
+                                                            
_state->per_fragment_instance_idx()));
+        }
+    }
+    return Status::OK();
+}
+
 Status PipelineTask::try_close(Status exec_status) {
     if (_try_close_flag) {
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 41f60c27653..99e41421bb3 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -368,6 +368,9 @@ protected:
     int64_t _close_pipeline_time = 0;
 
     RuntimeProfile::Counter* _pip_task_total_timer;
+    std::shared_ptr<QueryStatistics> _query_statistics;
+    Status _collect_query_statistics();
+    bool _collect_query_statistics_with_every_batch = false;
 
 private:
     Operators _operators; // left is _source, right is _root
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index a6754215ace..22c18faa1e6 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -20,6 +20,8 @@
 #include <gen_cpp/data.pb.h>
 #include <glog/logging.h>
 
+#include <memory>
+
 namespace doris {
 
 void NodeStatistics::merge(const NodeStatistics& other) {
@@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
     recvr->merge(this);
 }
 
+void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
+    auto it = recvr->_query_statistics.find(sender_id);
+    if (it != recvr->_query_statistics.end()) {
+        merge(*it->second);
+    }
+}
+
 void QueryStatistics::clearNodeStatistics() {
     for (auto& pair : _nodes_statistics_map) {
         delete pair.second;
@@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() {
 
 void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int 
sender_id) {
     std::lock_guard<SpinLock> l(_lock);
-    QueryStatistics* query_statistics = nullptr;
-    auto iter = _query_statistics.find(sender_id);
-    if (iter == _query_statistics.end()) {
-        query_statistics = new QueryStatistics;
-        _query_statistics[sender_id] = query_statistics;
-    } else {
-        query_statistics = iter->second;
+    if (!_query_statistics.contains(sender_id)) {
+        _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
     }
-    query_statistics->from_pb(statistics);
+    _query_statistics[sender_id]->from_pb(statistics);
 }
 
-QueryStatisticsRecvr::~QueryStatisticsRecvr() {
-    // It is unnecessary to lock here, because the destructor will be
-    // called alter DataStreamRecvr's close in ExchangeNode.
-    for (auto& pair : _query_statistics) {
-        delete pair.second;
-    }
-    _query_statistics.clear();
+void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int 
sender_id) {
+    if (!statistics->collected()) return;
+    if (_query_statistics.contains(sender_id)) return;
+    std::lock_guard<SpinLock> l(_lock);
+    _query_statistics[sender_id] = statistics;
 }
 
 } // namespace doris
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index c4ace8f23e2..42c1457472f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include <map>
+#include <memory>
 #include <mutex>
 #include <unordered_map>
 #include <utility>
@@ -88,6 +89,7 @@ public:
 
     void merge(QueryStatisticsRecvr* recvr);
 
+    void merge(QueryStatisticsRecvr* recvr, int sender_id);
     // Get the maximum value from the peak memory collected by all node 
statistics
     int64_t calculate_max_peak_memory_bytes();
 
@@ -100,13 +102,18 @@ public:
         returned_rows = 0;
         max_peak_memory_bytes = 0;
         clearNodeStatistics();
+        //clear() is used before collection, so calling "clear" is equivalent 
to being collected.
+        set_collected();
     }
 
     void to_pb(PQueryStatistics* statistics);
 
     void from_pb(const PQueryStatistics& statistics);
+    bool collected() const { return _collected; }
+    void set_collected() { _collected = true; }
 
 private:
+    friend class QueryStatisticsRecvr;
     int64_t scan_rows;
     int64_t scan_bytes;
     int64_t cpu_ms;
@@ -117,17 +124,22 @@ private:
     // only set once by result sink when closing.
     int64_t max_peak_memory_bytes;
     // The statistics of the query on each backend.
-    typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+    using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
     NodeStatisticsMap _nodes_statistics_map;
+    bool _collected = false;
 };
-
+using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
 class QueryStatisticsRecvr {
 public:
-    ~QueryStatisticsRecvr();
+    ~QueryStatisticsRecvr() = default;
 
+    // Transmitted via RPC, incurring serialization overhead.
     void insert(const PQueryStatistics& statistics, int sender_id);
 
+    // using local_exchange for transmission, only need to hold a shared 
pointer.
+    void insert(QueryStatisticsPtr statistics, int sender_id);
+
 private:
     friend class QueryStatistics;
 
@@ -138,7 +150,7 @@ private:
         }
     }
 
-    std::map<int, QueryStatistics*> _query_statistics;
+    std::map<int, QueryStatisticsPtr> _query_statistics;
     SpinLock _lock;
 };
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 128fb310392..3bbff6c20a6 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
 Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
     RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
     if (!_is_pipeline_scan || _should_create_scanner) {
-        statistics->add_scan_bytes(_read_compressed_counter->value());
-        statistics->add_scan_rows(_raw_rows_counter->value());
+        statistics->add_scan_bytes(_byte_read_counter->value());
+        statistics->add_scan_rows(_rows_read_counter->value());
         statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
     }
     return Status::OK();
 }
 
+Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, 
int) {
+    RETURN_IF_ERROR(collect_query_statistics(statistics));
+    return Status::OK();
+}
+
 Status NewOlapScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
     // if you want to add some profile in scan node, even it have not new 
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 0725c37cf5e..cbbf3072872 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -64,6 +64,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
     Status collect_query_statistics(QueryStatistics* statistics) override;
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
 
     void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 26b5ead8156..eef7cf8271c 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -551,7 +551,8 @@ void NewOlapScanner::_update_realtime_counters() {
 }
 
 void NewOlapScanner::_update_counters_before_close() {
-    if (!_state->enable_profile() || _has_updated_counter) {
+    //  Please don't directly enable the profile here, we need to set 
QueryStatistics using the counter inside.
+    if (_has_updated_counter) {
         return;
     }
     _has_updated_counter = true;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 57dc6b66fd0..16ef362b96d 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -276,7 +276,8 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
 
 Status VScanNode::_init_profile() {
     // 1. counters for scan node
-    _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", 
TUnit::UNIT);
+    _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead", 
TUnit::UNIT);
+    _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead", 
TUnit::BYTES);
     _total_throughput_counter =
             runtime_profile()->add_rate_counter("TotalReadThroughput", 
_rows_read_counter);
     _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 2ecd6ac3202..458cb318b75 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -319,6 +319,7 @@ protected:
 
     // rows read from the scanner (including those discarded by (pre)filters)
     RuntimeProfile::Counter* _rows_read_counter;
+    RuntimeProfile::Counter* _byte_read_counter;
     // Wall based aggregate read throughput [rows/sec]
     RuntimeProfile::Counter* _total_throughput_counter;
     RuntimeProfile::Counter* _num_scanners;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 0a43b3dd3d8..3f900d1ca31 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -95,6 +95,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
                     break;
                 }
                 _num_rows_read += block->rows();
+                _num_byte_read += block->allocated_bytes();
             }
 
             // 2. Filter the output block finally.
@@ -191,6 +192,7 @@ void VScanner::_update_counters_before_close() {
     if (_parent) {
         COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
+        COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
     } else {
         COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 5359204758f..1abd20ba276 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -196,6 +196,8 @@ protected:
     // num of rows read from scanner
     int64_t _num_rows_read = 0;
 
+    int64_t _num_byte_read = 0;
+
     // num of rows return from scanner, after filter block
     int64_t _num_rows_return = 0;
 
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index 7eb98588892..4b2e4f76d03 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -150,7 +150,11 @@ Status 
VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
     statistics->merge(_sub_plan_query_statistics_recvr.get());
     return Status::OK();
 }
-
+Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, 
int sender_id) {
+    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
+    statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
+    return Status::OK();
+}
 Status VExchangeNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index c4f083dda48..94302e84d9b 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -56,6 +56,7 @@ public:
     Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
     void release_resource(RuntimeState* state) override;
     Status collect_query_statistics(QueryStatistics* statistics) override;
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
     Status close(RuntimeState* state) override;
 
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a6af6c634da..17f39756459 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -458,6 +458,17 @@ void VDataStreamRecvr::remove_sender(int sender_id, int 
be_number, Status exec_s
     _sender_queues[use_sender_id]->decrement_senders(be_number);
 }
 
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number, 
QueryStatisticsPtr statistics,
+                                     Status exec_status) {
+    if (!exec_status.ok()) {
+        cancel_stream(exec_status);
+        return;
+    }
+    int use_sender_id = _is_merging ? sender_id : 0;
+    _sender_queues[use_sender_id]->decrement_senders(be_number);
+    _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
+}
+
 void VDataStreamRecvr::cancel_stream(Status exec_status) {
     VLOG_QUERY << "cancel_stream: fragment_instance_id=" << 
print_id(_fragment_instance_id)
                << exec_status;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index d8ab873f87a..b2fd6afdc7f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -108,6 +108,9 @@ public:
     // sender queue. Called from DataStreamMgr.
     void remove_sender(int sender_id, int be_number, Status exec_status);
 
+    void remove_sender(int sender_id, int be_number, QueryStatisticsPtr 
statistics,
+                       Status exec_status);
+
     void cancel_stream(Status exec_status);
 
     void close();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 4f9816db0b4..bff65bd897e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -140,7 +140,13 @@ Status Channel<Parent>::send_local_block(Status 
exec_status, bool eos) {
 
         _local_recvr->add_block(&block, _parent->sender_id(), true);
         if (eos) {
-            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
+            /// TODO: Supported on pipelineX, we can hold QueryStatistics on 
the fragment instead of on instances.
+            if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
+                _local_recvr->remove_sender(_parent->sender_id(), _be_number,
+                                            _parent->query_statisticsPtr(), 
exec_status);
+            } else {
+                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
+            }
         }
         return Status::OK();
     } else {
@@ -273,7 +279,12 @@ Status Channel<Parent>::close_internal(Status exec_status) 
{
         SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         if (is_local()) {
             if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
+                if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
+                    _local_recvr->remove_sender(_parent->sender_id(), 
_be_number,
+                                                
_parent->query_statisticsPtr(), exec_status);
+                } else {
+                    _local_recvr->remove_sender(_parent->sender_id(), 
_be_number, exec_status);
+                }
             }
         } else {
             status = send_remote_block((PBlock*)nullptr, true, exec_status);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 203d59dc664..35f20d1b821 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -139,6 +139,7 @@ public:
     }
     MemTracker* mem_tracker() { return _mem_tracker.get(); }
     QueryStatistics* query_statistics() { return _query_statistics.get(); }
+    QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
     bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; }
     RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
     segment_v2::CompressionTypePB& compression_type() { return 
_compression_type; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to