This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8a65b59edba [cherry-pick] Cherry-pick  report runtime workload 
statistics (#30717)
8a65b59edba is described below

commit 8a65b59edba13c1a4d3faad7ce27ae9850019731
Author: wangbo <[email protected]>
AuthorDate: Sun Feb 4 09:19:03 2024 +0800

    [cherry-pick] Cherry-pick  report runtime workload statistics (#30717)
    
    * [Feature](profile)Support report runtime workload statistics #29591
    [feature](auditlog)Add runtime cpu time/peak memory metric #29925
    [Fix](executor)Fix Grayscale upgrade be code dump when report statistics 
#29843
    
    * fix ut FragmentMgrTest.normal
---
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   3 +
 be/src/common/daemon.cpp                           |  18 ++
 be/src/common/daemon.h                             |   2 +
 be/src/exec/data_sink.cpp                          |  40 +---
 be/src/exec/data_sink.h                            |   8 -
 be/src/exec/exec_node.cpp                          |  21 +-
 be/src/exec/exec_node.h                            |  11 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |   8 -
 be/src/pipeline/exec/exchange_sink_buffer.h        |   4 -
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   1 -
 be/src/pipeline/exec/operator.h                    |  21 --
 be/src/pipeline/pipeline.h                         |   7 -
 be/src/pipeline/pipeline_fragment_context.cpp      |   6 +-
 be/src/pipeline/pipeline_fragment_context.h        |  11 -
 be/src/pipeline/pipeline_task.cpp                  |  38 +---
 be/src/pipeline/pipeline_task.h                    |   4 -
 be/src/runtime/buffer_control_block.cpp            |   4 +-
 be/src/runtime/buffer_control_block.h              |  19 +-
 be/src/runtime/exec_env.h                          |   6 +
 be/src/runtime/exec_env_init.cpp                   |   3 +
 be/src/runtime/fragment_mgr.cpp                    |  22 +-
 be/src/runtime/fragment_mgr.h                      |   1 -
 be/src/runtime/memory/mem_tracker.h                |   8 +
 be/src/runtime/memory/mem_tracker_limiter.cpp      |   6 +
 be/src/runtime/plan_fragment_executor.cpp          |  76 ++-----
 be/src/runtime/plan_fragment_executor.h            |  21 +-
 be/src/runtime/query_context.h                     |  47 +++++
 be/src/runtime/query_statistics.cpp                |  22 +-
 be/src/runtime/query_statistics.h                  |  32 ++-
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 175 ++++++++++++++++
 be/src/runtime/runtime_query_statistics_mgr.h      |  59 ++++++
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |  15 --
 be/src/vec/exec/scan/new_olap_scan_node.h          |   2 -
 be/src/vec/exec/scan/vscan_node.cpp                |   3 +
 be/src/vec/exec/scan/vscanner.cpp                  |   7 +
 be/src/vec/exec/scan/vscanner.h                    |  12 +-
 be/src/vec/exec/vexchange_node.cpp                 |  17 +-
 be/src/vec/exec/vexchange_node.h                   |   4 -
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  16 +-
 be/src/vec/runtime/vdata_stream_mgr.h              |  11 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  15 +-
 be/src/vec/runtime/vdata_stream_recvr.h            |  12 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  30 +--
 be/src/vec/sink/vdata_stream_sender.h              |  27 +--
 be/src/vec/sink/vresult_file_sink.cpp              |  16 +-
 be/src/vec/sink/vresult_file_sink.h                |   9 +-
 be/src/vec/sink/vresult_sink.cpp                   |   8 +-
 be/src/vec/sink/vresult_sink.h                     |   3 -
 be/test/vec/runtime/vdata_stream_test.cpp          |   8 +-
 .../main/java/org/apache/doris/common/Config.java  |  10 +
 .../main/java/org/apache/doris/catalog/Env.java    |  10 +
 .../java/org/apache/doris/plugin/AuditEvent.java   |   2 +
 .../java/org/apache/doris/qe/AuditLogHelper.java   |   2 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   5 +-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   9 +
 .../WorkloadRuntimeStatusMgr.java                  | 224 +++++++++++++++++++++
 gensrc/thrift/FrontendService.thrift               |   7 +
 58 files changed, 752 insertions(+), 440 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8524be30f44..6926b336181 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1123,6 +1123,10 @@ DEFINE_mBool(enable_column_type_check, "true");
 // Tolerance for the number of partition id 0 in rowset, default 0
 DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");
 
+DEFINE_mInt32(report_query_statistics_interval_ms, "3000");
+// 30s
+DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 65a3441033f..7f4d1c6b636 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1178,6 +1178,9 @@ DECLARE_mBool(enable_column_type_check);
 // Tolerance for the number of partition id 0 in rowset, default 0
 DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
 
+DECLARE_mInt32(report_query_statistics_interval_ms);
+DECLARE_mInt32(query_statistics_reserve_timeout_ms);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c72f7d66ed8..efaba2d861c 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -44,10 +44,13 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "runtime/block_spill_manager.h"
+#include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/load_channel_mgr.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/task_group/task_group_manager.h"
 #include "runtime/user_function_cache.h"
 #include "service/backend_options.h"
@@ -368,6 +371,13 @@ void Daemon::block_spill_gc_thread() {
     }
 }
 
+void Daemon::query_runtime_statistics_thread() {
+    while (!_stop_background_threads_latch.wait_for(
+            
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
+    }
+}
+
 static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
     bool init_system_metrics = config::enable_system_metrics;
     std::set<std::string> disk_devices;
@@ -474,6 +484,11 @@ void Daemon::start() {
     st = Thread::create(
             "Daemon", "block_spill_gc_thread", [this]() { 
this->block_spill_gc_thread(); },
             &_block_spill_gc_thread);
+    st = Thread::create(
+            "Daemon", "query_runtime_statistics_thread",
+            [this]() { this->query_runtime_statistics_thread(); },
+            &_query_runtime_statistics_thread);
+
     CHECK(st.ok()) << st;
 }
 
@@ -498,6 +513,9 @@ void Daemon::stop() {
     if (_block_spill_gc_thread) {
         _block_spill_gc_thread->join();
     }
+    if (_query_runtime_statistics_thread) {
+        _query_runtime_statistics_thread->join();
+    }
 }
 
 } // namespace doris
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 8266a9b9f0c..5ac9abbe2b1 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -50,6 +50,7 @@ private:
     void load_channel_tracker_refresh_thread();
     void calculate_metrics_thread();
     void block_spill_gc_thread();
+    void query_runtime_statistics_thread();
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _tcmalloc_gc_thread;
@@ -58,5 +59,6 @@ private:
     scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
     scoped_refptr<Thread> _calculate_metrics_thread;
     scoped_refptr<Thread> _block_spill_gc_thread;
+    scoped_refptr<Thread> _query_runtime_statistics_thread;
 };
 } // namespace doris
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c5d54466113..d9c7b7399a2 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -55,14 +55,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.stream_sink) {
             return Status::InternalError("Missing data stream sink.");
         }
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new vectorized::VDataStreamSender(
-                state, pool, params.sender_id, row_desc, 
thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch));
+        sink->reset(new vectorized::VDataStreamSender(state, pool, 
params.sender_id, row_desc,
+                                                      thrift_sink.stream_sink, 
params.destinations,
+                                                      16 * 1024));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -82,20 +78,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     pool, params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl));
+                    params.destinations, 16 * 1024, output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
-                    send_query_statistics_with_every_batch, output_exprs));
+                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, 
output_exprs));
         }
         break;
     }
@@ -192,14 +182,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.stream_sink) {
             return Status::InternalError("Missing data stream sink.");
         }
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new vectorized::VDataStreamSender(
-                state, pool, local_params.sender_id, row_desc, 
thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch));
+        sink->reset(new vectorized::VDataStreamSender(state, pool, 
local_params.sender_id, row_desc,
+                                                      thrift_sink.stream_sink, 
params.destinations,
+                                                      16 * 1024));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -219,20 +205,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     pool, local_params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl));
+                    params.destinations, 16 * 1024, output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
-                    send_query_statistics_with_every_batch, output_exprs));
+                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, 
output_exprs));
         }
         break;
     }
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index fd59cd1d27c..7f57330e014 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -38,7 +38,6 @@ class RuntimeState;
 class TPlanFragmentExecParams;
 class RowDescriptor;
 class DescriptorTbl;
-class QueryStatistics;
 class TDataSink;
 class TExpr;
 class TPipelineFragmentParams;
@@ -101,19 +100,12 @@ public:
     // Returns the runtime profile for the sink.
     virtual RuntimeProfile* profile() = 0;
 
-    virtual void set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-        _query_statistics = statistics;
-    }
-
 protected:
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
     bool _closed;
     std::string _name;
 
-    // Maybe this will be transferred to BufferControlBlock.
-    std::shared_ptr<QueryStatistics> _query_statistics;
-
     OpentelemetrySpan _span {};
 };
 
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 2ee3f1ba693..4ddd4be9fd1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -74,7 +74,6 @@
 #include "vec/utils/util.hpp"
 
 namespace doris {
-class QueryStatistics;
 
 const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
 
@@ -96,6 +95,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, 
const DescriptorTbl
     if (tnode.__isset.output_tuple_id) {
         _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
     }
+    _query_statistics = std::make_shared<QueryStatistics>();
 }
 
 ExecNode::~ExecNode() = default;
@@ -172,22 +172,6 @@ Status ExecNode::reset(RuntimeState* state) {
     return Status::OK();
 }
 
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
-    DCHECK(statistics != nullptr);
-    for (auto child_node : _children) {
-        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
-    }
-    return Status::OK();
-}
-
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
-    DCHECK(statistics != nullptr);
-    for (auto child_node : _children) {
-        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, 
sender_id));
-    }
-    return Status::OK();
-}
-
 void ExecNode::release_resource(doris::RuntimeState* state) {
     if (!_is_resource_released) {
         if (_rows_returned_counter != nullptr) {
@@ -271,6 +255,9 @@ Status ExecNode::create_tree_helper(RuntimeState* state, 
ObjectPool* pool,
     int num_children = tnodes[*node_idx].num_children;
     ExecNode* node = nullptr;
     RETURN_IF_ERROR(create_node(state, pool, tnodes[*node_idx], descs, &node));
+    if (node != nullptr) {
+        
state->get_query_ctx()->register_query_statistics(node->get_query_statistics());
+    }
 
     // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
     if (parent != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index d92f884204b..e688fb7f889 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -157,13 +157,6 @@ public:
     // so should be fast.
     [[nodiscard]] virtual Status reset(RuntimeState* state);
 
-    // This should be called before close() and after get_next(), it is 
responsible for
-    // collecting statistics sent with row batch, it can't be called when 
prepare() returns
-    // error.
-    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics);
-
-    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics,
-                                                          int sender_id);
     // close() will get called for every exec node, regardless of what else is 
called and
     // the status of these calls (i.e. prepare() may never have been called, or
     // prepare()/open()/get_next() returned with an error).
@@ -242,6 +235,8 @@ public:
 
     size_t children_count() const { return _children.size(); }
 
+    std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
+
 protected:
     friend class DataSink;
 
@@ -334,6 +329,8 @@ protected:
 
     std::atomic<bool> _can_read = false;
 
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
+
 private:
     friend class pipeline::OperatorBase;
     bool _is_closed;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index a605b7fd04c..93d675876b6 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -190,10 +190,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
-        if (_statistics && _statistics->collected()) {
-            auto statistic = brpc_request->mutable_query_statistics();
-            _statistics->to_pb(statistic);
-        }
         if (request.block) {
             brpc_request->set_allocated_block(request.block.get());
         }
@@ -251,10 +247,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         if (request.block_holder->get_block()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
-        if (_statistics && _statistics->collected()) {
-            auto statistic = brpc_request->mutable_query_statistics();
-            _statistics->to_pb(statistic);
-        }
         auto* closure = request.channel->get_closure(id, request.eos, 
request.block_holder);
 
         ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index b31e4d33608..4543c4aa6df 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -175,8 +175,6 @@ public:
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
-    void set_query_statistics(QueryStatistics* statistics) { _statistics = 
statistics; }
-
 private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
             _instance_to_package_queue_mutex;
@@ -215,8 +213,6 @@ private:
     inline bool _is_receiver_eof(InstanceLoId id);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
-
-    QueryStatistics* _statistics = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index a62e1aedbc8..4b8edbcf118 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -67,7 +67,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
_sink->_sender_id,
                                                         _state->be_number(), 
_context);
 
-    _sink_buffer->set_query_statistics(_sink->query_statistics());
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->registe_channels(_sink_buffer.get());
     return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4a68a95e366..d98b4f16bd4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,14 +163,6 @@ public:
 
     bool is_source() const;
 
-    virtual Status collect_query_statistics(QueryStatistics* statistics) { 
return Status::OK(); };
-
-    virtual Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
-        return Status::OK();
-    };
-
-    virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
-
     virtual Status init(const TDataSink& tsink) { return Status::OK(); }
 
     // Prepare for running. (e.g. resource allocation, etc.)
@@ -307,9 +299,6 @@ public:
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override {
-        _sink->set_query_statistics(statistics);
-    }
 
 protected:
     NodeType* _sink;
@@ -380,16 +369,6 @@ public:
         return _node->runtime_profile();
     }
 
-    Status collect_query_statistics(QueryStatistics* statistics) override {
-        RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
-        return Status::OK();
-    }
-
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override {
-        RETURN_IF_ERROR(_node->collect_query_statistics(statistics, 
sender_id));
-        return Status::OK();
-    }
-
 protected:
     NodeType* _node;
     bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 056c331dd0c..f570865b23f 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -104,12 +104,6 @@ public:
 
     void set_is_root_pipeline() { _is_root_pipeline = true; }
     bool is_root_pipeline() const { return _is_root_pipeline; }
-    void set_collect_query_statistics_with_every_batch() {
-        _collect_query_statistics_with_every_batch = true;
-    }
-    [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
-        return _collect_query_statistics_with_every_batch;
-    }
 
 private:
     void _init_profile();
@@ -155,7 +149,6 @@ private:
     bool _always_can_read = false;
     bool _always_can_write = false;
     bool _is_root_pipeline = false;
-    bool _collect_query_statistics_with_every_batch = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 159d9054ae0..f3e1e4ef292 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -323,7 +323,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     _root_pipeline = fragment_context->add_pipeline();
     _root_pipeline->set_is_root_pipeline();
-    _root_pipeline->set_collect_query_statistics_with_every_batch();
     RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
     if (_sink) {
         RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
@@ -806,7 +805,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
             _multi_cast_stream_sink_senders[i].reset(new 
vectorized::VDataStreamSender(
                     _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, row_desc,
                     thrift_sink.multi_cast_stream_sink.sinks[i],
-                    thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 
1024, false));
+                    thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 
1024));
 
             // 2. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
@@ -880,8 +879,7 @@ void PipelineFragmentContext::send_report(bool done) {
              _fragment_instance_id, _backend_num, _runtime_state.get(),
              std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
              std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
-                       std::placeholders::_2),
-             _dml_query_statistics()});
+                       std::placeholders::_2)});
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index a80c77959d0..e39428da835 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,10 +121,6 @@ public:
         return _task_group_entity;
     }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> 
query_statistics) {
-        _query_statistics = query_statistics;
-    }
-
 protected:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
     Status _build_pipelines(ExecNode*, PipelinePtr);
@@ -207,13 +203,6 @@ protected:
 
     DescriptorTbl* _desc_tbl = nullptr;
     static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-    std::shared_ptr<QueryStatistics> _dml_query_statistics() {
-        if (_query_statistics && _query_statistics->collect_dml_statistics()) {
-            return _query_statistics;
-        }
-        return nullptr;
-    }
-    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index fa443c01b23..4e4c89de881 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -58,11 +58,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _fragment_context(fragment_context),
           _parent_profile(parent_profile) {
     _pipeline_task_watcher.start();
-    _query_statistics.reset(new 
QueryStatistics(state->query_options().query_type));
-    _sink->set_query_statistics(_query_statistics);
-    _collect_query_statistics_with_every_batch =
-            _pipeline->collect_query_statistics_with_every_batch();
-    fragment_context->set_query_statistics(_query_statistics);
 }
 
 void PipelineTask::_fresh_profile_counter() {
@@ -194,14 +189,24 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
 
 Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
-    SCOPED_CPU_TIMER(_task_cpu_timer);
     SCOPED_TIMER(_exec_timer);
     SCOPED_ATTACH_TASK(_state);
     int64_t time_spent = 0;
+
+    ThreadCpuStopWatch cpu_time_stop_watch;
+    cpu_time_stop_watch.start();
+
     Defer defer {[&]() {
         if (_task_queue) {
             _task_queue->update_statistics(this, time_spent);
         }
+
+        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+        _task_cpu_timer->update(delta_cpu_time);
+        auto cpu_qs = query_context()->get_cpu_statistics();
+        if (cpu_qs) {
+            cpu_qs->add_cpu_nanos(delta_cpu_time);
+        }
     }};
     // The status must be runnable
     *eos = false;
@@ -259,10 +264,6 @@ Status PipelineTask::execute(bool* eos) {
         *eos = _data_state == SourceState::FINISHED;
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            if (_data_state == SourceState::FINISHED ||
-                _collect_query_statistics_with_every_batch) {
-                RETURN_IF_ERROR(_collect_query_statistics());
-            }
             auto status = _sink->sink(_state, block, _data_state);
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
@@ -289,23 +290,6 @@ Status PipelineTask::finalize() {
     return _sink->finalize(_state);
 }
 
-Status PipelineTask::_collect_query_statistics() {
-    // The execnode tree of a fragment will be split into multiple pipelines, 
we only need to collect the root pipeline.
-    if (_pipeline->is_root_pipeline()) {
-        // If the current fragment has only one instance, we can collect all 
of them;
-        // otherwise, we need to collect them based on the sender_id.
-        if (_state->num_per_fragment_instances() == 1) {
-            _query_statistics->clear();
-            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
-        } else {
-            _query_statistics->clear();
-            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
-                                                            
_state->per_fragment_instance_idx()));
-        }
-    }
-    return Status::OK();
-}
-
 Status PipelineTask::try_close() {
     if (_try_close_flag) {
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ea78d795006..32fcd58ba05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -340,9 +340,5 @@ private:
     int64_t _close_pipeline_time = 0;
 
     RuntimeProfile::Counter* _pip_task_total_timer;
-
-    std::shared_ptr<QueryStatistics> _query_statistics;
-    Status _collect_query_statistics();
-    bool _collect_query_statistics_with_every_batch = false;
 };
 } // namespace doris::pipeline
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 07c40d2d9b7..62c1763861a 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -95,7 +95,9 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size)
           _is_cancelled(false),
           _buffer_rows(0),
           _buffer_limit(buffer_size),
-          _packet_num(0) {}
+          _packet_num(0) {
+    _query_statistics = std::make_unique<QueryStatistics>();
+}
 
 BufferControlBlock::~BufferControlBlock() {
     cancel();
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index c2275ac0d17..0bb38c54e00 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -80,11 +80,7 @@ public:
 
     const TUniqueId& fragment_id() const { return _fragment_id; }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
-        _query_statistics = statistics;
-    }
-
-    void update_num_written_rows(int64_t num_rows) {
+    void update_return_rows(int64_t num_rows) {
         // _query_statistics may be null when the result sink init failed
         // or some other failure.
         // and the number of written rows is only needed when all things go 
well.
@@ -93,13 +89,6 @@ public:
         }
     }
 
-    void update_max_peak_memory_bytes() {
-        if (_query_statistics != nullptr) {
-            int64_t max_peak_memory_bytes = 
_query_statistics->calculate_max_peak_memory_bytes();
-            
_query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
-        }
-    }
-
 protected:
     virtual bool _get_batch_queue_empty() { return _batch_queue.empty(); }
     virtual void _update_batch_queue_empty() {}
@@ -126,10 +115,8 @@ protected:
 
     std::deque<GetResultBatchCtx*> _waiting_rpc;
 
-    // It is shared with PlanFragmentExecutor and will be called in two 
different
-    // threads. But their calls are all at different time, there is no problem 
of
-    // multithreading access.
-    std::shared_ptr<QueryStatistics> _query_statistics;
+    // only used for FE using return rows to check limit
+    std::unique_ptr<QueryStatistics> _query_statistics;
 };
 
 class PipBufferControlBlock : public BufferControlBlock {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 2032d788392..d66e3688d81 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -56,6 +56,7 @@ class MemTracker;
 class StorageEngine;
 class ResultBufferMgr;
 class ResultQueueMgr;
+class RuntimeQueryStatiticsMgr;
 class TMasterInfo;
 class LoadChannelMgr;
 class StreamLoadExecutor;
@@ -113,6 +114,10 @@ public:
     }
     taskgroup::TaskGroupManager* task_group_manager() { return 
_task_group_manager; }
 
+    RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
+        return _runtime_query_statistics_mgr;
+    }
+
     // using template to simplify client cache management
     template <typename T>
     inline ClientCache<T>* get_client_cache() {
@@ -261,6 +266,7 @@ private:
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
+    RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d0fab65a75f..5ff83652a95 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -62,6 +62,7 @@
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/result_queue_mgr.h"
 #include "runtime/routine_load/routine_load_task_executor.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/small_file_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
@@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .set_max_queue_size(config::fragment_pool_queue_size)
             .build(&_join_node_thread_pool);
 
+    _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _task_group_manager = new taskgroup::TaskGroupManager();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
@@ -422,6 +424,7 @@ void ExecEnv::_destroy() {
     _brpc_iobuf_block_memory_tracker.reset();
     InvertedIndexSearcherCache::reset_global_instance();
 
+    SAFE_DELETE(_runtime_query_statistics_mgr);
     _is_init = false;
 }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ab915870c24..1529d66def2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -173,8 +173,7 @@ public:
 
 private:
     void coordinator_callback(const Status& status, RuntimeProfile* profile,
-                              RuntimeProfile* load_channel_profile, bool done,
-                              std::shared_ptr<QueryStatistics> 
query_statistics);
+                              RuntimeProfile* load_channel_profile, bool done);
 
     // Id of this query
     TUniqueId _query_id;
@@ -219,8 +218,7 @@ FragmentExecState::FragmentExecState(const TUniqueId& 
query_id,
           _query_ctx(std::move(query_ctx)),
           _executor(exec_env, 
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
                                               this, std::placeholders::_1, 
std::placeholders::_2,
-                                              std::placeholders::_3, 
std::placeholders::_4,
-                                              std::placeholders::_5)),
+                                              std::placeholders::_3, 
std::placeholders::_4)),
           _set_rsc_info(false),
           _timeout_second(-1),
           _report_status_cb_impl(report_status_cb_impl) {
@@ -306,15 +304,13 @@ Status FragmentExecState::cancel(const 
PPlanFragmentCancelReason& reason, const
 // Also, the reported status will always reflect the most recent execution 
status,
 // including the final status when execution finishes.
 void FragmentExecState::coordinator_callback(const Status& status, 
RuntimeProfile* profile,
-                                             RuntimeProfile* 
load_channel_profile, bool done,
-                                             std::shared_ptr<QueryStatistics> 
query_statistics) {
+                                             RuntimeProfile* 
load_channel_profile, bool done) {
     _report_status_cb_impl(
             {status, profile, load_channel_profile, done, _coord_addr, 
_query_id, -1,
              _fragment_instance_id, _backend_num, _executor.runtime_state(),
              std::bind(&FragmentExecState::update_status, this, 
std::placeholders::_1),
              std::bind(&PlanFragmentExecutor::cancel, &_executor, 
std::placeholders::_1,
-                       std::placeholders::_2),
-             query_statistics});
+                       std::placeholders::_2)});
     DCHECK(status.ok() || done); // if !status.ok() => done
 }
 
@@ -406,13 +402,6 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
 
     DCHECK(req.runtime_state != nullptr);
 
-    if (req.query_statistics) {
-        TQueryStatistics queryStatistics;
-        DCHECK(req.query_statistics->collect_dml_statistics());
-        req.query_statistics->to_thrift(&queryStatistics);
-        params.__set_query_statistics(queryStatistics);
-    }
-
     if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && 
req.status.ok()) {
         // this is a load plan, and load is not finished, just make a brief 
report
         params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
@@ -746,6 +735,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             query_ctx->query_mem_tracker->enable_print_log_usage();
         }
 
+        query_ctx->register_memory_statistics();
+        query_ctx->register_cpu_statistics();
+
         if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
             if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
                 taskgroup::TaskGroupInfo task_group_info;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 1394e54f531..f6a37c74102 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -79,7 +79,6 @@ struct ReportStatusRequest {
     RuntimeState* runtime_state;
     std::function<Status(Status)> update_fn;
     std::function<void(const PPlanFragmentCancelReason&, const std::string&)> 
cancel_fn;
-    std::shared_ptr<QueryStatistics> query_statistics;
 };
 
 // This class used to manage all the fragment execute in this instance
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index 62c78294601..cf2b06d2061 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -33,6 +33,7 @@
 
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "runtime/query_statistics.h"
 #include "util/pretty_printer.h"
 
 namespace doris {
@@ -139,6 +140,9 @@ public:
             return;
         }
         _consumption->add(bytes);
+        if (_query_statistics) {
+            
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
+        }
     }
 
     void consume_no_update_peak(int64_t bytes) { // need extreme fast
@@ -149,6 +153,8 @@ public:
 
     void set_consumption(int64_t bytes) { _consumption->set(bytes); }
 
+    std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
+
 public:
     virtual Snapshot make_snapshot() const;
     // Specify group_num from mem_tracker_pool to generate snapshot.
@@ -181,6 +187,8 @@ protected:
 
     // Iterator into mem_tracker_pool for this object. Stored to have O(1) 
remove.
     std::list<MemTracker*>::iterator _tracker_group_it;
+
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 15c55e3eca4..ea255552aa2 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -78,6 +78,12 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const 
std::string& label, int64_
     } else {
         _group_num = random() % 999 + 1;
     }
+
+    // currently only select/load need runtime query statistics
+    if (_type == Type::LOAD || _type == Type::QUERY) {
+        _query_statistics = std::make_shared<QueryStatistics>();
+    }
+
     {
         std::lock_guard<std::mutex> 
l(mem_tracker_limiter_pool[_group_num].group_lock);
         _tracker_limiter_group_it = 
mem_tracker_limiter_pool[_group_num].trackers.insert(
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 2792189e8e2..d3f05b32f7c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -82,9 +82,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
           _closed(false),
           _is_report_success(false),
           _is_report_on_cancel(true),
-          _collect_query_statistics_with_every_batch(false),
           _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
     _report_thread_future = _report_thread_promise.get_future();
+    _query_statistics = std::make_shared<QueryStatistics>();
 }
 
 PlanFragmentExecutor::~PlanFragmentExecutor() {
@@ -127,6 +127,7 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     _runtime_state->set_query_mem_tracker(query_ctx == nullptr ? 
_exec_env->orphan_mem_tracker()
                                                                : 
query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
+    query_ctx->register_query_statistics(_query_statistics);
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
     _runtime_state->runtime_filter_mgr()->init();
@@ -219,11 +220,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
         if (sink_profile != nullptr) {
             profile()->add_child(sink_profile, true, nullptr);
         }
-
-        _collect_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
     } else {
         // _sink is set to nullptr
         _sink.reset(nullptr);
@@ -238,11 +234,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
 
     VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
     _prepared = true;
-
-    _query_statistics.reset(new 
QueryStatistics(request.query_options.query_type));
-    if (_sink != nullptr) {
-        _sink->set_query_statistics(_query_statistics);
-    }
     return Status::OK();
 }
 
@@ -304,7 +295,13 @@ Status PlanFragmentExecutor::open() {
 Status PlanFragmentExecutor::open_vectorized_internal() {
     SCOPED_TIMER(profile()->total_time_counter());
     {
-        SCOPED_CPU_TIMER(_fragment_cpu_timer);
+        ThreadCpuStopWatch cpu_time_stop_watch;
+        cpu_time_stop_watch.start();
+        Defer defer {[&]() {
+            int64_t cpu_time = cpu_time_stop_watch.elapsed_time();
+            _fragment_cpu_timer->update(cpu_time);
+        }};
+
         RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
         RETURN_IF_CANCELLED(_runtime_state);
         if (_sink == nullptr) {
@@ -314,15 +311,18 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         doris::vectorized::Block block;
         bool eos = false;
 
+        int64_t old_cpu_time = cpu_time_stop_watch.elapsed_time();
         while (!eos) {
+            Defer defer {[&]() {
+                int64_t current_cpu_time = cpu_time_stop_watch.elapsed_time();
+                int64_t delta_time = current_cpu_time - old_cpu_time;
+                _query_statistics->add_cpu_nanos(delta_time);
+                old_cpu_time = current_cpu_time;
+            }};
+
             RETURN_IF_CANCELLED(_runtime_state);
             RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
 
-            // Collect this plan and sub plan statistics, and send to parent 
plan.
-            if (_collect_query_statistics_with_every_batch) {
-                _collect_query_statistics();
-            }
-
             if (!eos || block.rows() > 0) {
                 auto st = _sink->send(runtime_state(), &block);
                 if (st.is<END_OF_FILE>()) {
@@ -333,7 +333,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         }
     }
     {
-        _collect_query_statistics();
         Status status;
         {
             std::lock_guard<std::mutex> l(_status_lock);
@@ -370,44 +369,6 @@ Status 
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
     return Status::OK();
 }
 
-void PlanFragmentExecutor::_collect_query_statistics() {
-    _query_statistics->clear();
-    Status status;
-    /// TODO(yxc):
-    // The judgment of enable_local_exchange here is a bug, it should not need 
to be checked. I will fix this later.
-    bool _is_local = false;
-    if (_runtime_state->query_options().__isset.enable_local_exchange) {
-        _is_local = _runtime_state->query_options().enable_local_exchange;
-    }
-
-    if (_is_local) {
-        if (_runtime_state->num_per_fragment_instances() == 1) {
-            status = _plan->collect_query_statistics(_query_statistics.get());
-        } else {
-            status = _plan->collect_query_statistics(_query_statistics.get(),
-                                                     
_runtime_state->per_fragment_instance_idx());
-        }
-    } else {
-        status = _plan->collect_query_statistics(_query_statistics.get());
-    }
-
-    if (!status.ok()) {
-        LOG(INFO) << "collect query statistics failed, st=" << status;
-        return;
-    }
-    _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / 
NANOS_PER_MILLIS);
-    if (_runtime_state->backend_id() != -1) {
-        _collect_node_statistics();
-    }
-}
-
-void PlanFragmentExecutor::_collect_node_statistics() {
-    DCHECK(_runtime_state->backend_id() != -1);
-    NodeStatistics* node_statistics =
-            
_query_statistics->add_nodes_statistics(_runtime_state->backend_id());
-    
node_statistics->set_peak_memory(_runtime_state->query_mem_tracker()->peak_consumption());
-}
-
 void PlanFragmentExecutor::report_profile() {
     SCOPED_ATTACH_TASK(_runtime_state.get());
     VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
@@ -487,8 +448,7 @@ void PlanFragmentExecutor::send_report(bool done) {
     // but fragments still need to be cancelled (e.g. limit reached), the 
coordinator will
     // be waiting for a final report and profile.
     _report_status_cb(status, _is_report_success ? profile() : nullptr,
-                      _is_report_success ? load_channel_profile() : nullptr, 
done || !status.ok(),
-                      _dml_query_statistics());
+                      _is_report_success ? load_channel_profile() : nullptr, 
done || !status.ok());
 }
 
 void PlanFragmentExecutor::stop_report_thread() {
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 565c9047c4b..5c048753f8e 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -47,7 +47,6 @@ class DataSink;
 class DescriptorTbl;
 class ExecEnv;
 class ObjectPool;
-class QueryStatistics;
 
 namespace vectorized {
 class Block;
@@ -81,8 +80,7 @@ public:
     // functions like PrettyPrint() or to_thrift(), neither of which is const
     // because they take locks.
     using report_status_callback =
-            std::function<void(const Status&, RuntimeProfile*, 
RuntimeProfile*, bool,
-                               std::shared_ptr<QueryStatistics>)>;
+            std::function<void(const Status&, RuntimeProfile*, 
RuntimeProfile*, bool)>;
 
     // report_status_cb, if !empty(), is used to report the accumulated profile
     // information periodically during execution (open() or get_next()).
@@ -198,12 +196,6 @@ private:
 
     RuntimeProfile::Counter* _fragment_cpu_timer;
 
-    // It is shared with BufferControlBlock and will be called in two different
-    // threads. But their calls are all at different time, there is no problem 
of
-    // multithreaded access.
-    std::shared_ptr<QueryStatistics> _query_statistics;
-    bool _collect_query_statistics_with_every_batch;
-
     // Record the cancel information when calling the cancel() method, return 
it to FE
     PPlanFragmentCancelReason _cancel_reason;
     std::string _cancel_msg;
@@ -244,16 +236,7 @@ private:
 
     const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); 
}
 
-    void _collect_query_statistics();
-
-    std::shared_ptr<QueryStatistics> _dml_query_statistics() {
-        if (_query_statistics && _query_statistics->collect_dml_statistics()) {
-            return _query_statistics;
-        }
-        return nullptr;
-    }
-
-    void _collect_node_statistics();
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c6b996bcae9..8746483df4c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,6 +33,7 @@
 #include "runtime/query_statistics.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "task_group/task_group.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
@@ -77,6 +78,11 @@ public:
         if (_task_group) {
             _task_group->remove_mem_tracker_limiter(query_mem_tracker);
         }
+        if (_exec_env &&
+            _exec_env
+                    ->runtime_query_statistics_mgr()) { // for BE ut 
FragmentMgrTest.Normal, Meaningless
+            
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(query_id));
+        }
     }
 
     // Notice. For load fragments, the fragment_num sent by FE has a small 
probability of 0.
@@ -174,6 +180,45 @@ public:
 
     RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
 
+    void register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
+        
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id),
 qs,
+                                                                             
coord_addr);
+    }
+
+    std::shared_ptr<QueryStatistics> get_query_statistics() {
+        return 
_exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
+                print_id(query_id));
+    }
+
+    void register_memory_statistics() {
+        if (query_mem_tracker) {
+            std::shared_ptr<QueryStatistics> qs = 
query_mem_tracker->get_query_statistics();
+            std::string query_id_str = print_id(query_id);
+            if (qs) {
+                if (_exec_env &&
+                    _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
+                    
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+                            query_id_str, qs, coord_addr);
+                }
+            } else {
+                LOG(INFO) << " query " << query_id_str << " get memory query 
statistics failed ";
+            }
+        }
+    }
+
+    void register_cpu_statistics() {
+        if (!_cpu_statistics) {
+            _cpu_statistics = std::make_shared<QueryStatistics>();
+            if (_exec_env &&
+                _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
+                
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+                        print_id(query_id), _cpu_statistics, coord_addr);
+            }
+        }
+    }
+
+    std::shared_ptr<QueryStatistics> get_cpu_statistics() { return 
_cpu_statistics; }
+
 public:
     TUniqueId query_id;
     DescriptorTbl* desc_tbl;
@@ -227,6 +272,8 @@ private:
     taskgroup::TaskGroupPtr _task_group;
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
     const TQueryOptions _query_options;
+
+    std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index 7630362f5eb..0a20f6d6e0a 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -22,6 +22,8 @@
 
 #include <memory>
 
+#include "util/time.h"
+
 namespace doris {
 
 void NodeStatistics::merge(const NodeStatistics& other) {
@@ -40,7 +42,13 @@ void NodeStatistics::from_pb(const PNodeStatistics& 
node_statistics) {
 void QueryStatistics::merge(const QueryStatistics& other) {
     scan_rows += other.scan_rows;
     scan_bytes += other.scan_bytes;
-    cpu_ms += other.cpu_ms;
+    cpu_nanos += other.cpu_nanos;
+
+    int64_t other_peak_mem = other.max_peak_memory_bytes.load();
+    if (other_peak_mem > this->max_peak_memory_bytes) {
+        this->max_peak_memory_bytes = other_peak_mem;
+    }
+
     for (auto& other_node_statistics : other._nodes_statistics_map) {
         int64_t node_id = other_node_statistics.first;
         auto node_statistics = add_nodes_statistics(node_id);
@@ -52,7 +60,6 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
     DCHECK(statistics != nullptr);
     statistics->set_scan_rows(scan_rows);
     statistics->set_scan_bytes(scan_bytes);
-    statistics->set_cpu_ms(cpu_ms);
     statistics->set_returned_rows(returned_rows);
     statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
     for (auto iter = _nodes_statistics_map.begin(); iter != 
_nodes_statistics_map.end(); ++iter) {
@@ -64,17 +71,16 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
 
 void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
     DCHECK(statistics != nullptr);
-    statistics->scan_bytes = scan_bytes;
-    statistics->scan_rows = scan_rows;
-    statistics->cpu_ms = cpu_ms;
-    statistics->returned_rows = returned_rows;
-    statistics->max_peak_memory_bytes = max_peak_memory_bytes;
+    statistics->__set_scan_bytes(scan_bytes);
+    statistics->__set_scan_rows(scan_rows);
+    statistics->__set_cpu_ms(cpu_nanos.load() / NANOS_PER_MILLIS);
+    statistics->__set_returned_rows(returned_rows);
+    statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
 }
 
 void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
     scan_rows = statistics.scan_rows();
     scan_bytes = statistics.scan_bytes();
-    cpu_ms = statistics.cpu_ms();
     for (auto& p_node_statistics : statistics.nodes_statistics()) {
         int64_t node_id = p_node_statistics.node_id();
         auto node_statistics = add_nodes_statistics(node_id);
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index 4923e727961..e9df0e338ab 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -59,13 +59,12 @@ private:
 // or plan's statistics and QueryStatisticsRecvr is responsible for collecting 
it.
 class QueryStatistics {
 public:
-    QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT)
+    QueryStatistics()
             : scan_rows(0),
               scan_bytes(0),
-              cpu_ms(0),
+              cpu_nanos(0),
               returned_rows(0),
-              max_peak_memory_bytes(0),
-              _query_type(query_type) {}
+              max_peak_memory_bytes(0) {}
     virtual ~QueryStatistics();
 
     void merge(const QueryStatistics& other);
@@ -74,7 +73,7 @@ public:
 
     void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; }
 
-    void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
+    void add_cpu_nanos(int64_t cpu_nanos) { this->cpu_nanos += cpu_nanos; }
 
     NodeStatistics* add_nodes_statistics(int64_t node_id) {
         NodeStatistics* nodeStatistics = nullptr;
@@ -103,9 +102,10 @@ public:
     void clearNodeStatistics();
 
     void clear() {
-        scan_rows = 0;
-        scan_bytes = 0;
-        cpu_ms = 0;
+        scan_rows.store(0);
+        scan_bytes.store(0);
+
+        cpu_nanos = 0;
         returned_rows = 0;
         max_peak_memory_bytes = 0;
         clearNodeStatistics();
@@ -119,25 +119,23 @@ public:
     bool collected() const { return _collected; }
     void set_collected() { _collected = true; }
 
-    // LOAD does not need to collect information on the exchange node.
-    bool collect_dml_statistics() { return _query_type == TQueryType::LOAD; }
+    int64_t get_scan_rows() { return scan_rows.load(); }
+    int64_t get_scan_bytes() { return scan_bytes.load(); }
 
 private:
     friend class QueryStatisticsRecvr;
-    int64_t scan_rows;
-    int64_t scan_bytes;
-    int64_t cpu_ms;
+    std::atomic<int64_t> scan_rows;
+    std::atomic<int64_t> scan_bytes;
+    std::atomic<int64_t> cpu_nanos;
+
     // number rows returned by query.
     // only set once by result sink when closing.
     int64_t returned_rows;
-    // Maximum memory peak for all backends.
-    // only set once by result sink when closing.
-    int64_t max_peak_memory_bytes;
+    std::atomic<int64_t> max_peak_memory_bytes;
     // The statistics of the query on each backend.
     using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
     NodeStatisticsMap _nodes_statistics_map;
     bool _collected = false;
-    const TQueryType::type _query_type;
 };
 using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
new file mode 100644
index 00000000000..f9ada4218df
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_query_statistics_mgr.h"
+
+#include <gen_cpp/HeartbeatService_types.h>
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "util/debug_util.h"
+#include "util/time.h"
+
+namespace doris {
+
+void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
+                                                         
std::shared_ptr<QueryStatistics> qs_ptr,
+                                                         TNetworkAddress 
fe_addr) {
+    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
+        _query_statistics_ctx_map[query_id] = 
std::make_unique<QueryStatisticsCtx>(fe_addr);
+    }
+    _query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr);
+}
+
+void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
+    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+    // 1 get query statistics map
+    std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
+    std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, 
timeout>
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        int64_t current_time = MonotonicMillis();
+        int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
+        for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+            if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) {
+                std::map<std::string, TQueryStatistics> tmp_map;
+                fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map);
+            }
+
+            QueryStatistics tmp_qs;
+            for (auto& qs_ptr : qs_ctx_ptr->qs_list) {
+                tmp_qs.merge(*qs_ptr);
+            }
+            TQueryStatistics ret_t_qs;
+            tmp_qs.to_thrift(&ret_t_qs);
+            fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs;
+
+            bool is_query_finished = qs_ctx_ptr->is_query_finished;
+            bool is_timeout_after_finish = false;
+            if (is_query_finished) {
+                is_timeout_after_finish =
+                        (current_time - qs_ctx_ptr->query_finish_time) > 
conf_qs_timeout;
+            }
+            qs_status[query_id] = std::make_pair(is_query_finished, 
is_timeout_after_finish);
+        }
+    }
+
+    // 2 report query statistics to fe
+    std::map<TNetworkAddress, bool> rpc_result;
+    for (auto& [addr, qs_map] : fe_qs_map) {
+        rpc_result[addr] = false;
+        // 2.1 get client
+        Status coord_status;
+        FrontendServiceConnection 
coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
+                                        &coord_status);
+        std::stringstream ss;
+        addr.printTo(ss);
+        std::string add_str = ss.str();
+
+        if (!coord_status.ok()) {
+            std::stringstream ss;
+            LOG(WARNING) << "could not get client " << add_str
+                         << " when report workload runtime stats, reason is "
+                         << coord_status.to_string();
+            continue;
+        }
+
+        // 2.2 send report
+        TReportWorkloadRuntimeStatusParams report_runtime_params;
+        report_runtime_params.__set_backend_id(be_id);
+        report_runtime_params.__set_query_statistics_map(qs_map);
+
+        TReportExecStatusParams params;
+        params.report_workload_runtime_status = report_runtime_params;
+
+        TReportExecStatusResult res;
+        Status rpc_status;
+        try {
+            coord->reportExecStatus(res, params);
+            rpc_result[addr] = true;
+        } catch (apache::thrift::TApplicationException& e) {
+            LOG(WARNING) << "fe " << add_str
+                         << " throw exception when report statistics, reason=" 
<< e.what()
+                         << " , you can see fe log for details.";
+        } catch (apache::thrift::transport::TTransportException& e) {
+            LOG(WARNING) << "report workload runtime statistics to " << add_str
+                         << " failed,  err: " << e.what();
+            rpc_status = coord.reopen();
+            if (!rpc_status.ok()) {
+                LOG(WARNING)
+                        << "reopen thrift client failed when report workload 
runtime statistics to"
+                        << add_str;
+            } else {
+                try {
+                    coord->reportExecStatus(res, params);
+                    rpc_result[addr] = true;
+                } catch (apache::thrift::transport::TTransportException& e2) {
+                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
+                                 << " failed,  err: " << e2.what();
+                }
+            }
+        }
+    }
+
+    //  3 when query is finished and (last rpc is send success), remove 
finished query statistics
+    if (fe_qs_map.size() == 0) {
+        return;
+    }
+
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        for (auto& [addr, qs_map] : fe_qs_map) {
+            bool is_rpc_success = rpc_result[addr];
+            for (auto& [query_id, qs] : qs_map) {
+                auto& qs_status_pair = qs_status[query_id];
+                bool is_query_finished = qs_status_pair.first;
+                bool is_timeout_after_finish = qs_status_pair.second;
+                if ((is_rpc_success && is_query_finished) || 
is_timeout_after_finish) {
+                    _query_statistics_ctx_map.erase(query_id);
+                }
+            }
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
+    // NOTE: here must be a write lock
+    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+    // when a query get query_ctx succ, but failed before create node/operator,
+    // it may not register query statistics, so it can not be mark finish
+    if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
+        auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
+        qs_ptr->is_query_finished = true;
+        qs_ptr->query_finish_time = MonotonicMillis();
+    }
+}
+
+std::shared_ptr<QueryStatistics> 
RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
+        std::string query_id) {
+    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
+        return nullptr;
+    }
+    std::shared_ptr<QueryStatistics> qs_ptr = 
std::make_shared<QueryStatistics>();
+    for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) {
+        qs_ptr->merge(*qs);
+    }
+    return qs_ptr;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
new file mode 100644
index 00000000000..c4e997d9ffb
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <shared_mutex>
+#include <string>
+
+#include "runtime/query_statistics.h"
+
+namespace doris {
+
+class QueryStatisticsCtx {
+public:
+    QueryStatisticsCtx(TNetworkAddress fe_addr) : fe_addr(fe_addr) {
+        this->is_query_finished = false;
+    }
+    ~QueryStatisticsCtx() = default;
+
+    std::vector<std::shared_ptr<QueryStatistics>> qs_list;
+    bool is_query_finished;
+    TNetworkAddress fe_addr;
+    int64_t query_finish_time;
+};
+
+class RuntimeQueryStatiticsMgr {
+public:
+    RuntimeQueryStatiticsMgr() = default;
+    ~RuntimeQueryStatiticsMgr() = default;
+
+    void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
+                                   TNetworkAddress fe_addr);
+
+    void report_runtime_query_statistics();
+
+    void set_query_finished(std::string query_id);
+
+    std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string 
query_id);
+
+private:
+    std::shared_mutex _qs_ctx_map_lock;
+    std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 6bdc9aba1d0..d3033ee25d1 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -79,21 +79,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
     }
 }
 
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!_is_pipeline_scan || _should_create_scanner) {
-        statistics->add_scan_bytes(_byte_read_counter->value());
-        statistics->add_scan_rows(_rows_read_counter->value());
-        statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
-    }
-    return Status::OK();
-}
-
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, 
int) {
-    RETURN_IF_ERROR(collect_query_statistics(statistics));
-    return Status::OK();
-}
-
 Status NewOlapScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
     // if you want to add some profile in scan node, even it have not new 
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index db424e17e59..4fd5992bb7e 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -63,8 +63,6 @@ public:
     friend class doris::pipeline::OlapScanOperator;
 
     Status prepare(RuntimeState* state) override;
-    Status collect_query_statistics(QueryStatistics* statistics) override;
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
 
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index c21d4c3dc06..2c311207be7 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1297,6 +1297,9 @@ Status VScanNode::_prepare_scanners(const int 
query_parallel_instance_num) {
     if (scanners.empty()) {
         _eos = true;
     } else {
+        for (auto& scanner : scanners) {
+            scanner->set_query_statistics(_query_statistics.get());
+        }
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(scanners, 
query_parallel_instance_num));
     }
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 2264eab9aa4..82e81bb7ae0 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -66,6 +66,8 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
         }
     }
 
+    int64_t old_scan_rows = _num_rows_read;
+    int64_t old_scan_bytes = _num_byte_read;
     {
         do {
             // if step 2 filter all rows of block, and block will be reused to 
get next rows,
@@ -94,6 +96,11 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                  _num_rows_read < rows_read_threshold);
     }
 
+    if (_query_statistics) {
+        _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
+        _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
+    }
+
     if (state->is_cancelled()) {
         return Status::Cancelled("cancelled");
     }
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 3dd57bedb51..7d4b36ff44d 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -32,6 +32,7 @@
 namespace doris {
 class RuntimeProfile;
 class TupleDescriptor;
+class QueryStatistics;
 
 namespace vectorized {
 class VExprContext;
@@ -113,7 +114,11 @@ public:
 
     int64_t get_scanner_wait_worker_timer() { return 
_scanner_wait_worker_timer; }
 
-    void update_scan_cpu_timer() { _scan_cpu_timer += 
_cpu_watch.elapsed_time(); }
+    void update_scan_cpu_timer() {
+        int64_t cpu_time = _cpu_watch.elapsed_time();
+        _scan_cpu_timer += cpu_time;
+        _query_statistics->add_cpu_nanos(cpu_time);
+    }
 
     RuntimeState* runtime_state() { return _state; }
 
@@ -149,6 +154,10 @@ public:
         return true;
     }
 
+    void set_query_statistics(QueryStatistics* query_statistics) {
+        _query_statistics = query_statistics;
+    }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
@@ -159,6 +168,7 @@ protected:
 
     RuntimeState* _state;
     VScanNode* _parent;
+    QueryStatistics* _query_statistics = nullptr;
     // Set if scan node has sort limit info
     int64_t _limit = -1;
 
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index dbd78a6033a..6b40af8e946 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -64,10 +64,9 @@ Status VExchangeNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
     DCHECK_GT(_num_senders, 0);
-    _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
     _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, _input_row_desc, state->fragment_instance_id(), _id, 
_num_senders,
-            _runtime_profile.get(), _is_merging, 
_sub_plan_query_statistics_recvr);
+            _runtime_profile.get(), _is_merging);
 
     if (_is_merging) {
         RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, 
_row_descriptor));
@@ -142,20 +141,6 @@ void VExchangeNode::release_resource(RuntimeState* state) {
     ExecNode::release_resource(state);
 }
 
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!statistics->collect_dml_statistics()) {
-        statistics->merge(_sub_plan_query_statistics_recvr.get());
-    }
-    return Status::OK();
-}
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, 
int sender_id) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!statistics->collect_dml_statistics()) {
-        statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
-    }
-    return Status::OK();
-}
 Status VExchangeNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 94302e84d9b..e49eb86a92c 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -32,7 +32,6 @@ namespace doris {
 class DorisNodesInfo;
 class ObjectPool;
 class QueryStatistics;
-class QueryStatisticsRecvr;
 class RuntimeState;
 class TPlanNode;
 
@@ -55,8 +54,6 @@ public:
     Status open(RuntimeState* state) override;
     Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
     void release_resource(RuntimeState* state) override;
-    Status collect_query_statistics(QueryStatistics* statistics) override;
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
     Status close(RuntimeState* state) override;
 
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
@@ -67,7 +64,6 @@ private:
     bool _is_ready;
     std::shared_ptr<VDataStreamRecvr> _stream_recvr;
     RowDescriptor _input_row_desc;
-    std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 1307b0d874f..33dfe81a113 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -57,14 +57,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const 
TUniqueId& fragment_instanc
 
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
         RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging,
-        std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) 
{
+        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging) {
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
               << ", node=" << dest_node_id;
-    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
-            this, state, row_desc, fragment_instance_id, dest_node_id, 
num_senders, is_merging,
-            profile, sub_plan_query_statistics_recvr));
+    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state, 
row_desc,
+                                                                 
fragment_instance_id, dest_node_id,
+                                                                 num_senders, 
is_merging, profile));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     std::lock_guard<std::mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
@@ -132,13 +131,6 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
         return Status::OK();
     }
 
-    // request can only be used before calling recvr's add_batch or when 
request
-    // is the last for the sender, because request maybe released after it's 
batch
-    // is consumed by ExchangeNode.
-    if (request->has_query_statistics()) {
-        recvr->add_sub_plan_statistics(request->query_statistics(), 
request->sender_id());
-    }
-
     bool eos = request->eos();
     if (request->has_block()) {
         RETURN_IF_ERROR(recvr->add_block(request->block(), 
request->sender_id(),
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index ca0e7ab4b74..da0c59c10a9 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -39,7 +39,6 @@ namespace doris {
 class RuntimeState;
 class RowDescriptor;
 class RuntimeProfile;
-class QueryStatisticsRecvr;
 class PTransmitDataParams;
 
 namespace vectorized {
@@ -50,11 +49,11 @@ public:
     VDataStreamMgr();
     ~VDataStreamMgr();
 
-    std::shared_ptr<VDataStreamRecvr> create_recvr(
-            RuntimeState* state, const RowDescriptor& row_desc,
-            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int num_senders,
-            RuntimeProfile* profile, bool is_merging,
-            std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
+    std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+                                                   const RowDescriptor& 
row_desc,
+                                                   const TUniqueId& 
fragment_instance_id,
+                                                   PlanNodeId dest_node_id, 
int num_senders,
+                                                   RuntimeProfile* profile, 
bool is_merging);
 
     std::shared_ptr<VDataStreamRecvr> find_recvr(const TUniqueId& 
fragment_instance_id,
                                                  PlanNodeId node_id, bool 
acquire_lock = true);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 064cb65e1c2..6e25e7160f8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -292,11 +292,10 @@ void VDataStreamRecvr::SenderQueue::close() {
     _block_queue.clear();
 }
 
-VDataStreamRecvr::VDataStreamRecvr(
-        VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& 
row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
-        bool is_merging, RuntimeProfile* profile,
-        std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* 
state,
+                                   const RowDescriptor& row_desc,
+                                   const TUniqueId& fragment_instance_id, 
PlanNodeId dest_node_id,
+                                   int num_senders, bool is_merging, 
RuntimeProfile* profile)
         : _mgr(stream_mgr),
 #ifdef USE_MEM_TRACKER
           _query_mem_tracker(state->query_mem_tracker()),
@@ -309,7 +308,6 @@ VDataStreamRecvr::VDataStreamRecvr(
           _is_closed(false),
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
-          _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
           _enable_pipeline(state->enable_pipeline_exec()) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
@@ -409,12 +407,9 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) 
{
     }
 }
 
-void VDataStreamRecvr::remove_sender(int sender_id, int be_number, 
QueryStatisticsPtr statistics) {
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->decrement_senders(be_number);
-    if (statistics != nullptr) {
-        _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
-    }
 }
 
 void VDataStreamRecvr::cancel_stream(const std::string& msg) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1694f7af516..b92aeadc8b1 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -54,7 +54,6 @@ namespace doris {
 class MemTracker;
 class PBlock;
 class MemTrackerLimiter;
-class PQueryStatistics;
 class RuntimeState;
 
 namespace vectorized {
@@ -65,8 +64,7 @@ class VDataStreamRecvr {
 public:
     VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const 
RowDescriptor& row_desc,
                      const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-                     int num_senders, bool is_merging, RuntimeProfile* profile,
-                     std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
+                     int num_senders, bool is_merging, RuntimeProfile* 
profile);
 
     virtual ~VDataStreamRecvr();
 
@@ -90,13 +88,9 @@ public:
     PlanNodeId dest_node_id() const { return _dest_node_id; }
     const RowDescriptor& row_desc() const { return _row_desc; }
 
-    void add_sub_plan_statistics(const PQueryStatistics& statistics, int 
sender_id) {
-        _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
-    }
-
     // Indicate that a particular sender is done. Delegated to the appropriate
     // sender queue. Called from DataStreamMgr.
-    void remove_sender(int sender_id, int be_number, QueryStatisticsPtr 
statistics = nullptr);
+    void remove_sender(int sender_id, int be_number);
 
     // We need msg to make sure we can pass existing regression test.
     void cancel_stream(const std::string& msg = "");
@@ -170,8 +164,6 @@ private:
     // Number of blocks received
     RuntimeProfile::Counter* _blocks_produced_counter;
 
-    std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
-
     bool _enable_pipeline;
 };
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 851ba8f2d96..e72879011cb 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -134,8 +134,7 @@ Status Channel::send_local_block(bool eos, const Status& 
exec_status) {
         COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         _local_recvr->add_block(&block, _parent->_sender_id, true);
         if (eos) {
-            _local_recvr->remove_sender(_parent->_sender_id, _be_number,
-                                        _parent->query_statisticsPtr());
+            _local_recvr->remove_sender(_parent->_sender_id, _be_number);
         }
         return Status::OK();
     } else {
@@ -176,10 +175,6 @@ Status Channel::send_remote_block(PBlock* block, bool eos, 
const Status& exec_st
     VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
              << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
              << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
-    if (_is_transfer_chain && (_send_query_statistics_with_every_batch || 
eos)) {
-        auto statistic = _brpc_request.mutable_query_statistics();
-        _parent->_query_statistics->to_pb(statistic);
-    }
 
     _brpc_request.set_eos(eos);
 
@@ -292,8 +287,7 @@ Status Channel::close_internal(const Status& exec_status) {
                 if (!exec_status.ok()) {
                     _local_recvr->cancel_stream(exec_status.msg());
                 }
-                _local_recvr->remove_sender(_parent->_sender_id, _be_number,
-                                            _parent->query_statisticsPtr());
+                _local_recvr->remove_sender(_parent->_sender_id, _be_number);
             }
         } else {
             status = send_remote_block((PBlock*)nullptr, true, exec_status);
@@ -327,8 +321,7 @@ void Channel::ch_roll_pb_block() {
 VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
                                      const RowDescriptor& row_desc, const 
TDataStreamSink& sink,
                                      const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     int per_channel_buffer_size,
-                                     bool 
send_query_statistics_with_every_batch)
+                                     int per_channel_buffer_size)
         : _sender_id(sender_id),
           _pool(pool),
           _row_desc(row_desc),
@@ -348,21 +341,17 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
     _enable_pipeline_exec = state->enable_pipeline_exec();
 
     for (int i = 0; i < destinations.size(); ++i) {
-        // Select first dest as transfer chain.
-        bool is_transfer_chain = (i == 0);
         const auto& fragment_instance_id = 
destinations[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
             if (_enable_pipeline_exec) {
                 _channel_shared_ptrs.emplace_back(new PipChannel(
                         this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size, 
is_transfer_chain,
-                        send_query_statistics_with_every_batch));
+                        sink.dest_node_id, per_channel_buffer_size));
             } else {
                 _channel_shared_ptrs.emplace_back(new Channel(
                         this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size, 
is_transfer_chain,
-                        send_query_statistics_with_every_batch));
+                        sink.dest_node_id, per_channel_buffer_size));
             }
             fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                                  _channel_shared_ptrs.size() - 
1);
@@ -384,8 +373,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
 VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
                                      PlanNodeId dest_node_id,
                                      const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     int per_channel_buffer_size,
-                                     bool 
send_query_statistics_with_every_batch)
+                                     int per_channel_buffer_size)
         : _sender_id(sender_id),
           _pool(pool),
           _row_desc(row_desc),
@@ -401,8 +389,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
             fragment_id_to_channel_index.end()) {
             _channel_shared_ptrs.emplace_back(
                     new Channel(this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                                _dest_node_id, per_channel_buffer_size, false,
-                                send_query_statistics_with_every_batch));
+                                _dest_node_id, per_channel_buffer_size));
         }
         fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                              _channel_shared_ptrs.size() - 1);
@@ -411,8 +398,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
 }
 
 VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                     int per_channel_buffer_size,
-                                     bool 
send_query_statistics_with_every_batch)
+                                     int per_channel_buffer_size)
         : _sender_id(0),
           _pool(pool),
           _row_desc(row_desc),
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 3e164b4dd18..d9365c3f483 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -74,15 +74,14 @@ public:
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, const TDataStreamSink& 
sink,
                       const std::vector<TPlanFragmentDestination>& 
destinations,
-                      int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
+                      int per_channel_buffer_size);
 
     VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& 
row_desc,
                       PlanNodeId dest_node_id,
                       const std::vector<TPlanFragmentDestination>& 
destinations,
-                      int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
+                      int per_channel_buffer_size);
 
-    VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int 
per_channel_buffer_size,
-                      bool send_query_statistics_with_every_batch);
+    VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int 
per_channel_buffer_size);
 
     ~VDataStreamSender() override;
 
@@ -106,9 +105,6 @@ public:
 
     const RowDescriptor& row_desc() { return _row_desc; }
 
-    QueryStatistics* query_statistics() { return _query_statistics.get(); }
-    QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
-
 protected:
     friend class Channel;
     friend class PipChannel;
@@ -215,8 +211,7 @@ public:
     // when data is added via add_row() and not sent directly via send_batch().
     Channel(VDataStreamSender* parent, const RowDescriptor& row_desc,
             const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
-            PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
-            bool send_query_statistics_with_every_batch)
+            PlanNodeId dest_node_id, int buffer_size)
             : _parent(parent),
               _row_desc(row_desc),
               _fragment_instance_id(fragment_instance_id),
@@ -225,9 +220,7 @@ public:
               _packet_seq(0),
               _need_close(false),
               _closed(false),
-              _brpc_dest_addr(brpc_dest),
-              _is_transfer_chain(is_transfer_chain),
-              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) 
{
+              _brpc_dest_addr(brpc_dest) {
         std::string localhost = BackendOptions::get_localhost();
         _is_local = (_brpc_dest_addr.hostname == localhost) &&
                     (_brpc_dest_addr.port == config::brpc_port);
@@ -367,9 +360,6 @@ protected:
     RefCountClosure<PTransmitDataResult>* _closure = nullptr;
     Status _receiver_status;
     int32_t _brpc_timeout_ms = 500;
-    // whether the dest can be treated as query statistics transfer chain.
-    bool _is_transfer_chain;
-    bool _send_query_statistics_with_every_batch;
     RuntimeState* _state;
 
     bool _is_local;
@@ -417,10 +407,9 @@ class PipChannel final : public Channel {
 public:
     PipChannel(VDataStreamSender* parent, const RowDescriptor& row_desc,
                const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
-               PlanNodeId dest_node_id, int buffer_size, bool 
is_transfer_chain,
-               bool send_query_statistics_with_every_batch)
-            : Channel(parent, row_desc, brpc_dest, fragment_instance_id, 
dest_node_id, buffer_size,
-                      is_transfer_chain, 
send_query_statistics_with_every_batch) {
+               PlanNodeId dest_node_id, int buffer_size)
+            : Channel(parent, row_desc, brpc_dest, fragment_instance_id, 
dest_node_id,
+                      buffer_size) {
         ch_roll_pb_block();
     }
 
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index c5f4c0358e8..fb0ec06d282 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -39,7 +39,6 @@
 #include "vec/runtime/vfile_result_writer.h"
 
 namespace doris {
-class QueryStatistics;
 class TExpr;
 } // namespace doris
 
@@ -47,7 +46,6 @@ namespace doris::vectorized {
 
 VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& 
row_desc,
                                  const TResultFileSink& sink, int 
per_channel_buffer_size,
-                                 bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr)
         : _t_output_expr(t_output_expr), _row_desc(row_desc) {
     CHECK(sink.__isset.file_options);
@@ -66,7 +64,6 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int 
sender_id, const RowDescr
                                  const TResultFileSink& sink,
                                  const std::vector<TPlanFragmentDestination>& 
destinations,
                                  int per_channel_buffer_size,
-                                 bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs)
         : _t_output_expr(t_output_expr),
           
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
@@ -78,8 +75,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int 
sender_id, const RowDescr
     _is_top_sink = false;
     CHECK_EQ(destinations.size(), 1);
     _stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, 
sink.dest_node_id,
-                                               destinations, 
per_channel_buffer_size,
-                                               
send_query_statistics_with_every_batch));
+                                               destinations, 
per_channel_buffer_size));
 
     _name = "VResultFileSink";
     //for impl csv_with_name and csv_with_names_and_types
@@ -168,7 +164,7 @@ Status VResultFileSink::close(RuntimeState* state, Status 
exec_status) {
     if (_is_top_sink) {
         // close sender, this is normal path end
         if (_sender) {
-            _sender->update_num_written_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
+            _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
             _sender->close(final_status);
         }
         state->exec_env()->result_mgr()->cancel_at_time(
@@ -189,12 +185,4 @@ Status VResultFileSink::close(RuntimeState* state, Status 
exec_status) {
     return Status::OK();
 }
 
-void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-    if (_is_top_sink) {
-        _sender->set_query_statistics(statistics);
-    } else {
-        _stream_sender->set_query_statistics(statistics);
-    }
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h 
b/be/src/vec/sink/vresult_file_sink.h
index 90bc06bb422..44f7c9bfb5c 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -33,7 +33,6 @@
 namespace doris {
 class BufferControlBlock;
 class ObjectPool;
-class QueryStatistics;
 class RuntimeProfile;
 class RuntimeState;
 class TDataSink;
@@ -47,13 +46,12 @@ class VExprContext;
 class VResultFileSink : public DataSink {
 public:
     VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const 
TResultFileSink& sink,
-                    int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
-                    const std::vector<TExpr>& t_output_expr);
+                    int per_channel_buffer_size, const std::vector<TExpr>& 
t_output_expr);
     VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& 
row_desc,
                     const TResultFileSink& sink,
                     const std::vector<TPlanFragmentDestination>& destinations,
-                    int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
-                    const std::vector<TExpr>& t_output_expr, DescriptorTbl& 
descs);
+                    int per_channel_buffer_size, const std::vector<TExpr>& 
t_output_expr,
+                    DescriptorTbl& descs);
     ~VResultFileSink() override = default;
     Status init(const TDataSink& thrift_sink) override;
     Status prepare(RuntimeState* state) override;
@@ -66,7 +64,6 @@ public:
     Status close(RuntimeState* state, Status exec_status) override;
     RuntimeProfile* profile() override { return _profile; }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
     const RowDescriptor& row_desc() const { return _row_desc; }
 
 private:
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index b14b5b0eebb..4c731199d06 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -40,7 +40,6 @@
 #include "vec/sink/vmysql_result_writer.h"
 
 namespace doris {
-class QueryStatistics;
 class RowDescriptor;
 class TExpr;
 
@@ -153,8 +152,7 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
 
     // close sender, this is normal path end
     if (_sender) {
-        if (_writer) 
_sender->update_num_written_rows(_writer->get_written_rows());
-        _sender->update_max_peak_memory_bytes();
+        if (_writer) _sender->update_return_rows(_writer->get_written_rows());
         _sender->close(final_status);
     }
     state->exec_env()->result_mgr()->cancel_at_time(
@@ -163,9 +161,5 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
     return DataSink::close(state, exec_status);
 }
 
-void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-    _sender->set_query_statistics(statistics);
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index 19cfb3e3b42..b144bd0d22b 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -34,7 +34,6 @@ namespace doris {
 class RuntimeState;
 class RuntimeProfile;
 class BufferControlBlock;
-class QueryStatistics;
 class ResultWriter;
 class RowDescriptor;
 class TExpr;
@@ -137,8 +136,6 @@ public:
     virtual Status close(RuntimeState* state, Status exec_status) override;
     virtual RuntimeProfile* profile() override { return _profile; }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
-
     const RowDescriptor& row_desc() { return _row_desc; }
 
 private:
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 954bf8f194c..0f6dcf18e65 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -165,9 +165,8 @@ TEST_F(VDataStreamTest, BasicTest) {
     int num_senders = 1;
     RuntimeProfile profile("profile");
     bool is_merge = false;
-    std::shared_ptr<QueryStatisticsRecvr> statistics = 
std::make_shared<QueryStatisticsRecvr>();
     auto recv = _instance.create_recvr(&runtime_stat, row_desc, uid, nid, 
num_senders, &profile,
-                                       is_merge, statistics);
+                                       is_merge);
 
     // Test Sender
     int sender_id = 1;
@@ -189,11 +188,8 @@ TEST_F(VDataStreamTest, BasicTest) {
         dests.push_back(dest);
     }
     int per_channel_buffer_size = 1024 * 1024;
-    bool send_query_statistics_with_every_batch = false;
     VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id, 
row_desc, tsink.stream_sink,
-                             dests, per_channel_buffer_size,
-                             send_query_statistics_with_every_batch);
-    sender.set_query_statistics(std::make_shared<QueryStatistics>());
+                             dests, per_channel_buffer_size);
     sender.init(tsink);
     sender.prepare(&runtime_stat);
     sender.open(&runtime_stat);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2da574eb844..eacefe71549 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2257,6 +2257,16 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int publish_topic_info_interval_ms = 30000; // 30s
 
+    @ConfField(mutable = true)
+    public static int workload_runtime_status_thread_interval_ms = 2000;
+
+    // NOTE: it should bigger than be config 
report_query_statistics_interval_ms
+    @ConfField(mutable = true)
+    public static int query_audit_log_timeout_ms = 5000;
+
+    @ConfField(mutable = true)
+    public static int be_report_query_statistics_timeout_ms = 60000;
+
     @ConfField(masterOnly = true, description = {
         "设置 root 用户初始化2阶段 SHA-1 加密密码,默认为'',即不设置 root 密码。"
             + "后续 root 用户的 `set password` 操作会将 root 初始化密码覆盖。"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1fe294b5a6b..67aceac41a2 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -210,6 +210,7 @@ import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.statistics.StatisticsAutoCollector;
@@ -455,6 +456,8 @@ public class Env {
 
     private WorkloadGroupMgr workloadGroupMgr;
 
+    private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
+
     private QueryStats queryStats;
 
     private StatisticsCleaner statisticsCleaner;
@@ -678,6 +681,7 @@ public class Env {
         this.statisticsAutoCollector = new StatisticsAutoCollector();
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
+        this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
         this.queryStats = new QueryStats();
         this.loadManagerAdapter = new LoadManagerAdapter();
         this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -755,6 +759,10 @@ public class Env {
         return workloadGroupMgr;
     }
 
+    public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() {
+        return workloadRuntimeStatusMgr;
+    }
+
     // use this to get correct ClusterInfoService instance
     public static SystemInfoService getCurrentSystemInfo() {
         return getCurrentEnv().getClusterInfo();
@@ -922,6 +930,8 @@ public class Env {
         if (statisticsAutoCollector != null) {
             statisticsAutoCollector.start();
         }
+
+        workloadRuntimeStatusMgr.start();
     }
 
     // wait until FE is ready.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index dbef8321b54..723767971e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -100,6 +100,8 @@ public class AuditEvent {
     @AuditField(value = "FuzzyVariables")
     public String fuzzyVariables = "";
 
+    public long pushToAuditLogQueueTime;
+
     public static class AuditEventBuilder {
 
         private AuditEvent auditEvent = new AuditEvent();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index d914b012884..dcfedc26792 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -120,6 +120,6 @@ public class AuditLogHelper {
                 }
             }
         }
-        
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+        
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index e5e56c63520..3db88e87f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -356,7 +356,7 @@ public class ConnectProcessor {
                 
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
             }
         }
-        
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+        AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, null, true);
     }
 
     // Process COM_QUERY statement,
@@ -439,7 +439,8 @@ public class ConnectProcessor {
                         finalizeCommand();
                     }
                 }
-                auditAfterExec(auditStmt, executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(), true);
+                auditAfterExec(auditStmt, executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(),
+                        true);
                 // execute failed, skip remaining stmts
                 if (ctx.getState().getStateType() == MysqlStateType.ERR) {
                     break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index ada53b54bcd..1cbe41d9107 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -184,6 +184,15 @@ public final class QeProcessorImpl implements QeProcessor {
             LOG.debug("params: {}", params);
         }
         final TReportExecStatusResult result = new TReportExecStatusResult();
+
+        if (params.isSetReportWorkloadRuntimeStatus()) {
+            
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status);
+            if (!params.isSetQueryId()) {
+                result.setStatus(new TStatus(TStatusCode.OK));
+                return result;
+            }
+        }
+
         final QueryInfo info = coordinatorMap.get(params.query_id);
 
         if (info == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
new file mode 100644
index 00000000000..623c3c9aaa9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadschedpolicy;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkloadRuntimeStatusMgr {
+
+    private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
+    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
+    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
+    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
+    private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
+
+    class WorkloadRuntimeStatsThread extends Daemon {
+
+        WorkloadRuntimeStatusMgr workloadStatsMgr;
+
+        public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr 
workloadRuntimeStatusMgr, String threadName,
+                int interval) {
+            super(threadName, interval);
+            this.workloadStatsMgr = workloadRuntimeStatusMgr;
+        }
+
+        @Override
+        protected void runOneCycle() {
+            // 1 merge be query statistics
+            Map<String, TQueryStatistics> queryStatisticsMap = 
workloadStatsMgr.getQueryStatisticsMap();
+
+            // 2 log query audit
+            List<AuditEvent> auditEventList = 
workloadStatsMgr.getQueryNeedAudit();
+            for (AuditEvent auditEvent : auditEventList) {
+                TQueryStatistics queryStats = 
queryStatisticsMap.get(auditEvent.queryId);
+                if (queryStats != null) {
+                    auditEvent.scanRows = queryStats.scan_rows;
+                    auditEvent.scanBytes = queryStats.scan_bytes;
+                    auditEvent.peakMemoryBytes = 
queryStats.max_peak_memory_bytes;
+                    auditEvent.cpuTimeMs = queryStats.cpu_ms;
+                }
+                
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
+            }
+
+            // 3 clear beToQueryStatsMap when be report timeout
+            workloadStatsMgr.clearReportTimeoutBeStatistics();
+        }
+
+    }
+
+    private Daemon thread = null;
+
+    public void submitFinishQueryToAudit(AuditEvent event) {
+        queryAuditEventLogWriteLock();
+        try {
+            event.pushToAuditLogQueueTime = System.currentTimeMillis();
+            queryAuditEventList.add(event);
+        } finally {
+            queryAuditEventLogWriteUnlock();
+        }
+    }
+
+    public List<AuditEvent> getQueryNeedAudit() {
+        List<AuditEvent> ret = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        queryAuditEventLogWriteLock();
+        try {
+            int queryAuditLogTimeout = Config.query_audit_log_timeout_ms;
+            Iterator<AuditEvent> iter = queryAuditEventList.iterator();
+            while (iter.hasNext()) {
+                AuditEvent ae = iter.next();
+                if (currentTime - ae.pushToAuditLogQueueTime > 
queryAuditLogTimeout) {
+                    ret.add(ae);
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            queryAuditEventLogWriteUnlock();
+        }
+        return ret;
+    }
+
+    public void start() {
+        thread = new WorkloadRuntimeStatsThread(this, 
"workload-runtime-stats-thread",
+                Config.workload_runtime_status_thread_interval_ms);
+        thread.start();
+    }
+
+    public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
+        if (!params.isSetBackendId()) {
+            LOG.warn("be report workload runtime status but without beid");
+            return;
+        }
+        if (!params.isSetQueryStatisticsMap()) {
+            LOG.warn("be report workload runtime status but without query 
stats map");
+            return;
+        }
+        long beId = params.backend_id;
+        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
+        beLastReportTime.put(beId, System.currentTimeMillis());
+        if (queryIdMap == null) {
+            queryIdMap = Maps.newConcurrentMap();
+            queryIdMap.putAll(params.query_statistics_map);
+            beToQueryStatsMap.put(beId, queryIdMap);
+        } else {
+            long currentTime = System.currentTimeMillis();
+            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+                queryIdMap.put(entry.getKey(), entry.getValue());
+                queryLastReportTime.put(entry.getKey(), currentTime);
+            }
+        }
+    }
+
+    public Map<String, TQueryStatistics> getQueryStatisticsMap() {
+        // 1 merge query stats in all be
+        Set<Long> beIdSet = beToQueryStatsMap.keySet();
+        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        for (Long beId : beIdSet) {
+            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = currentQueryMap.keySet();
+            for (String queryId : queryIdSet) {
+                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                if (retQuery == null) {
+                    retQuery = new TQueryStatistics();
+                    retQueryMap.put(queryId, retQuery);
+                }
+
+                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
+                mergeQueryStatistics(retQuery, curQueryStats);
+            }
+        }
+
+        return retQueryMap;
+    }
+
+    private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
+        dst.scan_rows += src.scan_rows;
+        dst.scan_bytes += src.scan_bytes;
+        dst.cpu_ms += src.cpu_ms;
+        if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
+            dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> beNeedToRemove = new HashSet<>();
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            Long lastReportTime = beLastReportTime.get(beId);
+            if (lastReportTime != null
+                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beNeedToRemove.add(beId);
+            }
+        }
+        for (Long beId : beNeedToRemove) {
+            beToQueryStatsMap.remove(beId);
+            beLastReportTime.remove(beId);
+        }
+
+        // 2 clear report timeout query
+        Set<String> queryNeedToClear = new HashSet<>();
+        Long newCurrentTime = System.currentTimeMillis();
+        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
+        for (String queryId : queryLastReportTimeKeySet) {
+            Long lastReportTime = queryLastReportTime.get(queryId);
+            if (lastReportTime != null
+                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                queryNeedToClear.add(queryId);
+            }
+        }
+
+        Set<Long> beIdSet = beToQueryStatsMap.keySet();
+        for (String queryId : queryNeedToClear) {
+            for (Long beId : beIdSet) {
+                beToQueryStatsMap.get(beId).remove(queryId);
+            }
+            queryLastReportTime.remove(queryId);
+        }
+    }
+
+    private void queryAuditEventLogWriteLock() {
+        queryAuditEventLock.writeLock().lock();
+    }
+
+    private void queryAuditEventLogWriteUnlock() {
+        queryAuditEventLock.writeLock().unlock();
+    }
+}
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index bdaa4ea28e9..910a3763028 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -406,6 +406,11 @@ struct TQueryStatistics {
     5: optional i64 max_peak_memory_bytes
 }
 
+struct TReportWorkloadRuntimeStatusParams {
+    1: optional i64 backend_id
+    2: map<string, TQueryStatistics> query_statistics_map
+}
+
 // The results of an INSERT query, sent to the coordinator as part of
 // TReportExecStatusParams
 struct TReportExecStatusParams {
@@ -467,6 +472,8 @@ struct TReportExecStatusParams {
   23: optional list<TDetailedReportParams> detailed_report
 
   24: optional TQueryStatistics query_statistics
+
+  25: TReportWorkloadRuntimeStatusParams report_workload_runtime_status
 }
 
 struct TFeResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to