This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 8a65b59edba [cherry-pick] Cherry-pick report runtime workload
statistics (#30717)
8a65b59edba is described below
commit 8a65b59edba13c1a4d3faad7ce27ae9850019731
Author: wangbo <[email protected]>
AuthorDate: Sun Feb 4 09:19:03 2024 +0800
[cherry-pick] Cherry-pick report runtime workload statistics (#30717)
* [Feature](profile)Support report runtime workload statistics #29591
[feature](auditlog)Add runtime cpu time/peak memory metric #29925
[Fix](executor)Fix Grayscale upgrade be code dump when report statistics
#29843
* fix ut FragmentMgrTest.normal
---
be/src/common/config.cpp | 4 +
be/src/common/config.h | 3 +
be/src/common/daemon.cpp | 18 ++
be/src/common/daemon.h | 2 +
be/src/exec/data_sink.cpp | 40 +---
be/src/exec/data_sink.h | 8 -
be/src/exec/exec_node.cpp | 21 +-
be/src/exec/exec_node.h | 11 +-
be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 -
be/src/pipeline/exec/exchange_sink_buffer.h | 4 -
be/src/pipeline/exec/exchange_sink_operator.cpp | 1 -
be/src/pipeline/exec/operator.h | 21 --
be/src/pipeline/pipeline.h | 7 -
be/src/pipeline/pipeline_fragment_context.cpp | 6 +-
be/src/pipeline/pipeline_fragment_context.h | 11 -
be/src/pipeline/pipeline_task.cpp | 38 +---
be/src/pipeline/pipeline_task.h | 4 -
be/src/runtime/buffer_control_block.cpp | 4 +-
be/src/runtime/buffer_control_block.h | 19 +-
be/src/runtime/exec_env.h | 6 +
be/src/runtime/exec_env_init.cpp | 3 +
be/src/runtime/fragment_mgr.cpp | 22 +-
be/src/runtime/fragment_mgr.h | 1 -
be/src/runtime/memory/mem_tracker.h | 8 +
be/src/runtime/memory/mem_tracker_limiter.cpp | 6 +
be/src/runtime/plan_fragment_executor.cpp | 76 ++-----
be/src/runtime/plan_fragment_executor.h | 21 +-
be/src/runtime/query_context.h | 47 +++++
be/src/runtime/query_statistics.cpp | 22 +-
be/src/runtime/query_statistics.h | 32 ++-
be/src/runtime/runtime_query_statistics_mgr.cpp | 175 ++++++++++++++++
be/src/runtime/runtime_query_statistics_mgr.h | 59 ++++++
be/src/vec/exec/scan/new_olap_scan_node.cpp | 15 --
be/src/vec/exec/scan/new_olap_scan_node.h | 2 -
be/src/vec/exec/scan/vscan_node.cpp | 3 +
be/src/vec/exec/scan/vscanner.cpp | 7 +
be/src/vec/exec/scan/vscanner.h | 12 +-
be/src/vec/exec/vexchange_node.cpp | 17 +-
be/src/vec/exec/vexchange_node.h | 4 -
be/src/vec/runtime/vdata_stream_mgr.cpp | 16 +-
be/src/vec/runtime/vdata_stream_mgr.h | 11 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 15 +-
be/src/vec/runtime/vdata_stream_recvr.h | 12 +-
be/src/vec/sink/vdata_stream_sender.cpp | 30 +--
be/src/vec/sink/vdata_stream_sender.h | 27 +--
be/src/vec/sink/vresult_file_sink.cpp | 16 +-
be/src/vec/sink/vresult_file_sink.h | 9 +-
be/src/vec/sink/vresult_sink.cpp | 8 +-
be/src/vec/sink/vresult_sink.h | 3 -
be/test/vec/runtime/vdata_stream_test.cpp | 8 +-
.../main/java/org/apache/doris/common/Config.java | 10 +
.../main/java/org/apache/doris/catalog/Env.java | 10 +
.../java/org/apache/doris/plugin/AuditEvent.java | 2 +
.../java/org/apache/doris/qe/AuditLogHelper.java | 2 +-
.../java/org/apache/doris/qe/ConnectProcessor.java | 5 +-
.../java/org/apache/doris/qe/QeProcessorImpl.java | 9 +
.../WorkloadRuntimeStatusMgr.java | 224 +++++++++++++++++++++
gensrc/thrift/FrontendService.thrift | 7 +
58 files changed, 752 insertions(+), 440 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8524be30f44..6926b336181 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1123,6 +1123,10 @@ DEFINE_mBool(enable_column_type_check, "true");
// Tolerance for the number of partition id 0 in rowset, default 0
DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");
+DEFINE_mInt32(report_query_statistics_interval_ms, "3000");
+// 30s
+DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 65a3441033f..7f4d1c6b636 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1178,6 +1178,9 @@ DECLARE_mBool(enable_column_type_check);
// Tolerance for the number of partition id 0 in rowset, default 0
DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
+DECLARE_mInt32(report_query_statistics_interval_ms);
+DECLARE_mInt32(query_statistics_reserve_timeout_ms);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c72f7d66ed8..efaba2d861c 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -44,10 +44,13 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/block_spill_manager.h"
+#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/user_function_cache.h"
#include "service/backend_options.h"
@@ -368,6 +371,13 @@ void Daemon::block_spill_gc_thread() {
}
}
+void Daemon::query_runtime_statistics_thread() {
+ while (!_stop_background_threads_latch.wait_for(
+
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
+ }
+}
+
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
@@ -474,6 +484,11 @@ void Daemon::start() {
st = Thread::create(
"Daemon", "block_spill_gc_thread", [this]() {
this->block_spill_gc_thread(); },
&_block_spill_gc_thread);
+ st = Thread::create(
+ "Daemon", "query_runtime_statistics_thread",
+ [this]() { this->query_runtime_statistics_thread(); },
+ &_query_runtime_statistics_thread);
+
CHECK(st.ok()) << st;
}
@@ -498,6 +513,9 @@ void Daemon::stop() {
if (_block_spill_gc_thread) {
_block_spill_gc_thread->join();
}
+ if (_query_runtime_statistics_thread) {
+ _query_runtime_statistics_thread->join();
+ }
}
} // namespace doris
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 8266a9b9f0c..5ac9abbe2b1 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -50,6 +50,7 @@ private:
void load_channel_tracker_refresh_thread();
void calculate_metrics_thread();
void block_spill_gc_thread();
+ void query_runtime_statistics_thread();
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _tcmalloc_gc_thread;
@@ -58,5 +59,6 @@ private:
scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
scoped_refptr<Thread> _calculate_metrics_thread;
scoped_refptr<Thread> _block_spill_gc_thread;
+ scoped_refptr<Thread> _query_runtime_statistics_thread;
};
} // namespace doris
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c5d54466113..d9c7b7399a2 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -55,14 +55,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (!thrift_sink.__isset.stream_sink) {
return Status::InternalError("Missing data stream sink.");
}
- bool send_query_statistics_with_every_batch =
- params.__isset.send_query_statistics_with_every_batch
- ? params.send_query_statistics_with_every_batch
- : false;
// TODO: figure out good buffer size based on size of output row
- sink->reset(new vectorized::VDataStreamSender(
- state, pool, params.sender_id, row_desc,
thrift_sink.stream_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch));
+ sink->reset(new vectorized::VDataStreamSender(state, pool,
params.sender_id, row_desc,
+ thrift_sink.stream_sink,
params.destinations,
+ 16 * 1024));
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(),
thrift_sink.stream_sink));
break;
}
@@ -82,20 +78,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
}
// TODO: figure out good buffer size based on size of output row
- bool send_query_statistics_with_every_batch =
- params.__isset.send_query_statistics_with_every_batch
- ? params.send_query_statistics_with_every_batch
- : false;
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
pool, params.sender_id, row_desc,
thrift_sink.result_file_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
- output_exprs, desc_tbl));
+ params.destinations, 16 * 1024, output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
- send_query_statistics_with_every_batch, output_exprs));
+ pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
output_exprs));
}
break;
}
@@ -192,14 +182,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (!thrift_sink.__isset.stream_sink) {
return Status::InternalError("Missing data stream sink.");
}
- bool send_query_statistics_with_every_batch =
- params.__isset.send_query_statistics_with_every_batch
- ? params.send_query_statistics_with_every_batch
- : false;
// TODO: figure out good buffer size based on size of output row
- sink->reset(new vectorized::VDataStreamSender(
- state, pool, local_params.sender_id, row_desc,
thrift_sink.stream_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch));
+ sink->reset(new vectorized::VDataStreamSender(state, pool,
local_params.sender_id, row_desc,
+ thrift_sink.stream_sink,
params.destinations,
+ 16 * 1024));
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(),
thrift_sink.stream_sink));
break;
}
@@ -219,20 +205,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
}
// TODO: figure out good buffer size based on size of output row
- bool send_query_statistics_with_every_batch =
- params.__isset.send_query_statistics_with_every_batch
- ? params.send_query_statistics_with_every_batch
- : false;
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
pool, local_params.sender_id, row_desc,
thrift_sink.result_file_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
- output_exprs, desc_tbl));
+ params.destinations, 16 * 1024, output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
- send_query_statistics_with_every_batch, output_exprs));
+ pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
output_exprs));
}
break;
}
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index fd59cd1d27c..7f57330e014 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -38,7 +38,6 @@ class RuntimeState;
class TPlanFragmentExecParams;
class RowDescriptor;
class DescriptorTbl;
-class QueryStatistics;
class TDataSink;
class TExpr;
class TPipelineFragmentParams;
@@ -101,19 +100,12 @@ public:
// Returns the runtime profile for the sink.
virtual RuntimeProfile* profile() = 0;
- virtual void set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
- _query_statistics = statistics;
- }
-
protected:
// Set to true after close() has been called. subclasses should check and
set this in
// close().
bool _closed;
std::string _name;
- // Maybe this will be transferred to BufferControlBlock.
- std::shared_ptr<QueryStatistics> _query_statistics;
-
OpentelemetrySpan _span {};
};
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 2ee3f1ba693..4ddd4be9fd1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -74,7 +74,6 @@
#include "vec/utils/util.hpp"
namespace doris {
-class QueryStatistics;
const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
@@ -96,6 +95,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs,
{tnode.output_tuple_id}, {true}));
}
+ _query_statistics = std::make_shared<QueryStatistics>();
}
ExecNode::~ExecNode() = default;
@@ -172,22 +172,6 @@ Status ExecNode::reset(RuntimeState* state) {
return Status::OK();
}
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
- DCHECK(statistics != nullptr);
- for (auto child_node : _children) {
- RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
- }
- return Status::OK();
-}
-
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
- DCHECK(statistics != nullptr);
- for (auto child_node : _children) {
- RETURN_IF_ERROR(child_node->collect_query_statistics(statistics,
sender_id));
- }
- return Status::OK();
-}
-
void ExecNode::release_resource(doris::RuntimeState* state) {
if (!_is_resource_released) {
if (_rows_returned_counter != nullptr) {
@@ -271,6 +255,9 @@ Status ExecNode::create_tree_helper(RuntimeState* state,
ObjectPool* pool,
int num_children = tnodes[*node_idx].num_children;
ExecNode* node = nullptr;
RETURN_IF_ERROR(create_node(state, pool, tnodes[*node_idx], descs, &node));
+ if (node != nullptr) {
+
state->get_query_ctx()->register_query_statistics(node->get_query_statistics());
+ }
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index d92f884204b..e688fb7f889 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -157,13 +157,6 @@ public:
// so should be fast.
[[nodiscard]] virtual Status reset(RuntimeState* state);
- // This should be called before close() and after get_next(), it is
responsible for
- // collecting statistics sent with row batch, it can't be called when
prepare() returns
- // error.
- [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics);
-
- [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics,
- int sender_id);
// close() will get called for every exec node, regardless of what else is
called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).
@@ -242,6 +235,8 @@ public:
size_t children_count() const { return _children.size(); }
+ std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
+
protected:
friend class DataSink;
@@ -334,6 +329,8 @@ protected:
std::atomic<bool> _can_read = false;
+ std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
+
private:
friend class pipeline::OperatorBase;
bool _is_closed;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index a605b7fd04c..93d675876b6 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -190,10 +190,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
- if (_statistics && _statistics->collected()) {
- auto statistic = brpc_request->mutable_query_statistics();
- _statistics->to_pb(statistic);
- }
if (request.block) {
brpc_request->set_allocated_block(request.block.get());
}
@@ -251,10 +247,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
if (request.block_holder->get_block()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
- if (_statistics && _statistics->collected()) {
- auto statistic = brpc_request->mutable_query_statistics();
- _statistics->to_pb(statistic);
- }
auto* closure = request.channel->get_closure(id, request.eos,
request.block_holder);
ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index b31e4d33608..4543c4aa6df 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -175,8 +175,6 @@ public:
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
- void set_query_statistics(QueryStatistics* statistics) { _statistics =
statistics; }
-
private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
@@ -215,8 +213,6 @@ private:
inline bool _is_receiver_eof(InstanceLoId id);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
-
- QueryStatistics* _statistics = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index a62e1aedbc8..4b8edbcf118 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -67,7 +67,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
_sink->_sender_id,
_state->be_number(),
_context);
- _sink_buffer->set_query_statistics(_sink->query_statistics());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4a68a95e366..d98b4f16bd4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,14 +163,6 @@ public:
bool is_source() const;
- virtual Status collect_query_statistics(QueryStatistics* statistics) {
return Status::OK(); };
-
- virtual Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
- return Status::OK();
- };
-
- virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
-
virtual Status init(const TDataSink& tsink) { return Status::OK(); }
// Prepare for running. (e.g. resource allocation, etc.)
@@ -307,9 +299,6 @@ public:
Status finalize(RuntimeState* state) override { return Status::OK(); }
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
return _sink->profile(); }
- void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override {
- _sink->set_query_statistics(statistics);
- }
protected:
NodeType* _sink;
@@ -380,16 +369,6 @@ public:
return _node->runtime_profile();
}
- Status collect_query_statistics(QueryStatistics* statistics) override {
- RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
- return Status::OK();
- }
-
- Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override {
- RETURN_IF_ERROR(_node->collect_query_statistics(statistics,
sender_id));
- return Status::OK();
- }
-
protected:
NodeType* _node;
bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 056c331dd0c..f570865b23f 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -104,12 +104,6 @@ public:
void set_is_root_pipeline() { _is_root_pipeline = true; }
bool is_root_pipeline() const { return _is_root_pipeline; }
- void set_collect_query_statistics_with_every_batch() {
- _collect_query_statistics_with_every_batch = true;
- }
- [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
- return _collect_query_statistics_with_every_batch;
- }
private:
void _init_profile();
@@ -155,7 +149,6 @@ private:
bool _always_can_read = false;
bool _always_can_write = false;
bool _is_root_pipeline = false;
- bool _collect_query_statistics_with_every_batch = false;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 159d9054ae0..f3e1e4ef292 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -323,7 +323,6 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_root_pipeline = fragment_context->add_pipeline();
_root_pipeline->set_is_root_pipeline();
- _root_pipeline->set_collect_query_statistics_with_every_batch();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
@@ -806,7 +805,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
_multi_cast_stream_sink_senders[i].reset(new
vectorized::VDataStreamSender(
_runtime_state.get(), _runtime_state->obj_pool(),
sender_id, row_desc,
thrift_sink.multi_cast_stream_sink.sinks[i],
- thrift_sink.multi_cast_stream_sink.destinations[i], 16 *
1024, false));
+ thrift_sink.multi_cast_stream_sink.destinations[i], 16 *
1024));
// 2. create and set the source operator of
multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
@@ -880,8 +879,7 @@ void PipelineFragmentContext::send_report(bool done) {
_fragment_instance_id, _backend_num, _runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this,
std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this,
std::placeholders::_1,
- std::placeholders::_2),
- _dml_query_statistics()});
+ std::placeholders::_2)});
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index a80c77959d0..e39428da835 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,10 +121,6 @@ public:
return _task_group_entity;
}
- void set_query_statistics(std::shared_ptr<QueryStatistics>
query_statistics) {
- _query_statistics = query_statistics;
- }
-
protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink,
RuntimeState* state);
Status _build_pipelines(ExecNode*, PipelinePtr);
@@ -207,13 +203,6 @@ protected:
DescriptorTbl* _desc_tbl = nullptr;
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
- std::shared_ptr<QueryStatistics> _dml_query_statistics() {
- if (_query_statistics && _query_statistics->collect_dml_statistics()) {
- return _query_statistics;
- }
- return nullptr;
- }
- std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index fa443c01b23..4e4c89de881 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -58,11 +58,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
index, RuntimeState*
_fragment_context(fragment_context),
_parent_profile(parent_profile) {
_pipeline_task_watcher.start();
- _query_statistics.reset(new
QueryStatistics(state->query_options().query_type));
- _sink->set_query_statistics(_query_statistics);
- _collect_query_statistics_with_every_batch =
- _pipeline->collect_query_statistics_with_every_batch();
- fragment_context->set_query_statistics(_query_statistics);
}
void PipelineTask::_fresh_profile_counter() {
@@ -194,14 +189,24 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
- SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
+
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+
Defer defer {[&]() {
if (_task_queue) {
_task_queue->update_statistics(this, time_spent);
}
+
+ int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+ _task_cpu_timer->update(delta_cpu_time);
+ auto cpu_qs = query_context()->get_cpu_statistics();
+ if (cpu_qs) {
+ cpu_qs->add_cpu_nanos(delta_cpu_time);
+ }
}};
// The status must be runnable
*eos = false;
@@ -259,10 +264,6 @@ Status PipelineTask::execute(bool* eos) {
*eos = _data_state == SourceState::FINISHED;
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
- if (_data_state == SourceState::FINISHED ||
- _collect_query_statistics_with_every_batch) {
- RETURN_IF_ERROR(_collect_query_statistics());
- }
auto status = _sink->sink(_state, block, _data_state);
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
@@ -289,23 +290,6 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}
-Status PipelineTask::_collect_query_statistics() {
- // The execnode tree of a fragment will be split into multiple pipelines,
we only need to collect the root pipeline.
- if (_pipeline->is_root_pipeline()) {
- // If the current fragment has only one instance, we can collect all
of them;
- // otherwise, we need to collect them based on the sender_id.
- if (_state->num_per_fragment_instances() == 1) {
- _query_statistics->clear();
-
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
- } else {
- _query_statistics->clear();
-
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
-
_state->per_fragment_instance_idx()));
- }
- }
- return Status::OK();
-}
-
Status PipelineTask::try_close() {
if (_try_close_flag) {
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ea78d795006..32fcd58ba05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -340,9 +340,5 @@ private:
int64_t _close_pipeline_time = 0;
RuntimeProfile::Counter* _pip_task_total_timer;
-
- std::shared_ptr<QueryStatistics> _query_statistics;
- Status _collect_query_statistics();
- bool _collect_query_statistics_with_every_batch = false;
};
} // namespace doris::pipeline
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 07c40d2d9b7..62c1763861a 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -95,7 +95,9 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id,
int buffer_size)
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
- _packet_num(0) {}
+ _packet_num(0) {
+ _query_statistics = std::make_unique<QueryStatistics>();
+}
BufferControlBlock::~BufferControlBlock() {
cancel();
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index c2275ac0d17..0bb38c54e00 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -80,11 +80,7 @@ public:
const TUniqueId& fragment_id() const { return _fragment_id; }
- void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
- _query_statistics = statistics;
- }
-
- void update_num_written_rows(int64_t num_rows) {
+ void update_return_rows(int64_t num_rows) {
// _query_statistics may be null when the result sink init failed
// or some other failure.
// and the number of written rows is only needed when all things go
well.
@@ -93,13 +89,6 @@ public:
}
}
- void update_max_peak_memory_bytes() {
- if (_query_statistics != nullptr) {
- int64_t max_peak_memory_bytes =
_query_statistics->calculate_max_peak_memory_bytes();
-
_query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
- }
- }
-
protected:
virtual bool _get_batch_queue_empty() { return _batch_queue.empty(); }
virtual void _update_batch_queue_empty() {}
@@ -126,10 +115,8 @@ protected:
std::deque<GetResultBatchCtx*> _waiting_rpc;
- // It is shared with PlanFragmentExecutor and will be called in two
different
- // threads. But their calls are all at different time, there is no problem
of
- // multithreading access.
- std::shared_ptr<QueryStatistics> _query_statistics;
+ // only used for FE using return rows to check limit
+ std::unique_ptr<QueryStatistics> _query_statistics;
};
class PipBufferControlBlock : public BufferControlBlock {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 2032d788392..d66e3688d81 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -56,6 +56,7 @@ class MemTracker;
class StorageEngine;
class ResultBufferMgr;
class ResultQueueMgr;
+class RuntimeQueryStatiticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class StreamLoadExecutor;
@@ -113,6 +114,10 @@ public:
}
taskgroup::TaskGroupManager* task_group_manager() { return
_task_group_manager; }
+ RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
+ return _runtime_query_statistics_mgr;
+ }
+
// using template to simplify client cache management
template <typename T>
inline ClientCache<T>* get_client_cache() {
@@ -261,6 +266,7 @@ private:
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
+ RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d0fab65a75f..5ff83652a95 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -62,6 +62,7 @@
#include "runtime/result_buffer_mgr.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
+#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/small_file_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
@@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
.set_max_queue_size(config::fragment_pool_queue_size)
.build(&_join_node_thread_pool);
+ _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_task_group_manager = new taskgroup::TaskGroupManager();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
@@ -422,6 +424,7 @@ void ExecEnv::_destroy() {
_brpc_iobuf_block_memory_tracker.reset();
InvertedIndexSearcherCache::reset_global_instance();
+ SAFE_DELETE(_runtime_query_statistics_mgr);
_is_init = false;
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ab915870c24..1529d66def2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -173,8 +173,7 @@ public:
private:
void coordinator_callback(const Status& status, RuntimeProfile* profile,
- RuntimeProfile* load_channel_profile, bool done,
- std::shared_ptr<QueryStatistics>
query_statistics);
+ RuntimeProfile* load_channel_profile, bool done);
// Id of this query
TUniqueId _query_id;
@@ -219,8 +218,7 @@ FragmentExecState::FragmentExecState(const TUniqueId&
query_id,
_query_ctx(std::move(query_ctx)),
_executor(exec_env,
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
this, std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3,
std::placeholders::_4,
- std::placeholders::_5)),
+ std::placeholders::_3,
std::placeholders::_4)),
_set_rsc_info(false),
_timeout_second(-1),
_report_status_cb_impl(report_status_cb_impl) {
@@ -306,15 +304,13 @@ Status FragmentExecState::cancel(const
PPlanFragmentCancelReason& reason, const
// Also, the reported status will always reflect the most recent execution
status,
// including the final status when execution finishes.
void FragmentExecState::coordinator_callback(const Status& status,
RuntimeProfile* profile,
- RuntimeProfile*
load_channel_profile, bool done,
- std::shared_ptr<QueryStatistics>
query_statistics) {
+ RuntimeProfile*
load_channel_profile, bool done) {
_report_status_cb_impl(
{status, profile, load_channel_profile, done, _coord_addr,
_query_id, -1,
_fragment_instance_id, _backend_num, _executor.runtime_state(),
std::bind(&FragmentExecState::update_status, this,
std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, &_executor,
std::placeholders::_1,
- std::placeholders::_2),
- query_statistics});
+ std::placeholders::_2)});
DCHECK(status.ok() || done); // if !status.ok() => done
}
@@ -406,13 +402,6 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
DCHECK(req.runtime_state != nullptr);
- if (req.query_statistics) {
- TQueryStatistics queryStatistics;
- DCHECK(req.query_statistics->collect_dml_statistics());
- req.query_statistics->to_thrift(&queryStatistics);
- params.__set_query_statistics(queryStatistics);
- }
-
if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done &&
req.status.ok()) {
// this is a load plan, and load is not finished, just make a brief
report
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
@@ -746,6 +735,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->query_mem_tracker->enable_print_log_usage();
}
+ query_ctx->register_memory_statistics();
+ query_ctx->register_cpu_statistics();
+
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
taskgroup::TaskGroupInfo task_group_info;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 1394e54f531..f6a37c74102 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -79,7 +79,6 @@ struct ReportStatusRequest {
RuntimeState* runtime_state;
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)>
cancel_fn;
- std::shared_ptr<QueryStatistics> query_statistics;
};
// This class used to manage all the fragment execute in this instance
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 62c78294601..cf2b06d2061 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -33,6 +33,7 @@
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "runtime/query_statistics.h"
#include "util/pretty_printer.h"
namespace doris {
@@ -139,6 +140,9 @@ public:
return;
}
_consumption->add(bytes);
+ if (_query_statistics) {
+
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
+ }
}
void consume_no_update_peak(int64_t bytes) { // need extreme fast
@@ -149,6 +153,8 @@ public:
void set_consumption(int64_t bytes) { _consumption->set(bytes); }
+ std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
+
public:
virtual Snapshot make_snapshot() const;
// Specify group_num from mem_tracker_pool to generate snapshot.
@@ -181,6 +187,8 @@ protected:
// Iterator into mem_tracker_pool for this object. Stored to have O(1)
remove.
std::list<MemTracker*>::iterator _tracker_group_it;
+
+ std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 15c55e3eca4..ea255552aa2 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -78,6 +78,12 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const
std::string& label, int64_
} else {
_group_num = random() % 999 + 1;
}
+
+ // currently only select/load need runtime query statistics
+ if (_type == Type::LOAD || _type == Type::QUERY) {
+ _query_statistics = std::make_shared<QueryStatistics>();
+ }
+
{
std::lock_guard<std::mutex>
l(mem_tracker_limiter_pool[_group_num].group_lock);
_tracker_limiter_group_it =
mem_tracker_limiter_pool[_group_num].trackers.insert(
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 2792189e8e2..d3f05b32f7c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -82,9 +82,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_closed(false),
_is_report_success(false),
_is_report_on_cancel(true),
- _collect_query_statistics_with_every_batch(false),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
_report_thread_future = _report_thread_promise.get_future();
+ _query_statistics = std::make_shared<QueryStatistics>();
}
PlanFragmentExecutor::~PlanFragmentExecutor() {
@@ -127,6 +127,7 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
_runtime_state->set_query_mem_tracker(query_ctx == nullptr ?
_exec_env->orphan_mem_tracker()
:
query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
+ query_ctx->register_query_statistics(_query_statistics);
SCOPED_ATTACH_TASK(_runtime_state.get());
_runtime_state->runtime_filter_mgr()->init();
@@ -219,11 +220,6 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
if (sink_profile != nullptr) {
profile()->add_child(sink_profile, true, nullptr);
}
-
- _collect_query_statistics_with_every_batch =
- params.__isset.send_query_statistics_with_every_batch
- ? params.send_query_statistics_with_every_batch
- : false;
} else {
// _sink is set to nullptr
_sink.reset(nullptr);
@@ -238,11 +234,6 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
_prepared = true;
-
- _query_statistics.reset(new
QueryStatistics(request.query_options.query_type));
- if (_sink != nullptr) {
- _sink->set_query_statistics(_query_statistics);
- }
return Status::OK();
}
@@ -304,7 +295,13 @@ Status PlanFragmentExecutor::open() {
Status PlanFragmentExecutor::open_vectorized_internal() {
SCOPED_TIMER(profile()->total_time_counter());
{
- SCOPED_CPU_TIMER(_fragment_cpu_timer);
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+ Defer defer {[&]() {
+ int64_t cpu_time = cpu_time_stop_watch.elapsed_time();
+ _fragment_cpu_timer->update(cpu_time);
+ }};
+
RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
RETURN_IF_CANCELLED(_runtime_state);
if (_sink == nullptr) {
@@ -314,15 +311,18 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
doris::vectorized::Block block;
bool eos = false;
+ int64_t old_cpu_time = cpu_time_stop_watch.elapsed_time();
while (!eos) {
+ Defer defer {[&]() {
+ int64_t current_cpu_time = cpu_time_stop_watch.elapsed_time();
+ int64_t delta_time = current_cpu_time - old_cpu_time;
+ _query_statistics->add_cpu_nanos(delta_time);
+ old_cpu_time = current_cpu_time;
+ }};
+
RETURN_IF_CANCELLED(_runtime_state);
RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
- // Collect this plan and sub plan statistics, and send to parent
plan.
- if (_collect_query_statistics_with_every_batch) {
- _collect_query_statistics();
- }
-
if (!eos || block.rows() > 0) {
auto st = _sink->send(runtime_state(), &block);
if (st.is<END_OF_FILE>()) {
@@ -333,7 +333,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
}
}
{
- _collect_query_statistics();
Status status;
{
std::lock_guard<std::mutex> l(_status_lock);
@@ -370,44 +369,6 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
return Status::OK();
}
-void PlanFragmentExecutor::_collect_query_statistics() {
- _query_statistics->clear();
- Status status;
- /// TODO(yxc):
- // The judgment of enable_local_exchange here is a bug, it should not need
to be checked. I will fix this later.
- bool _is_local = false;
- if (_runtime_state->query_options().__isset.enable_local_exchange) {
- _is_local = _runtime_state->query_options().enable_local_exchange;
- }
-
- if (_is_local) {
- if (_runtime_state->num_per_fragment_instances() == 1) {
- status = _plan->collect_query_statistics(_query_statistics.get());
- } else {
- status = _plan->collect_query_statistics(_query_statistics.get(),
-
_runtime_state->per_fragment_instance_idx());
- }
- } else {
- status = _plan->collect_query_statistics(_query_statistics.get());
- }
-
- if (!status.ok()) {
- LOG(INFO) << "collect query statistics failed, st=" << status;
- return;
- }
- _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() /
NANOS_PER_MILLIS);
- if (_runtime_state->backend_id() != -1) {
- _collect_node_statistics();
- }
-}
-
-void PlanFragmentExecutor::_collect_node_statistics() {
- DCHECK(_runtime_state->backend_id() != -1);
- NodeStatistics* node_statistics =
-
_query_statistics->add_nodes_statistics(_runtime_state->backend_id());
-
node_statistics->set_peak_memory(_runtime_state->query_mem_tracker()->peak_consumption());
-}
-
void PlanFragmentExecutor::report_profile() {
SCOPED_ATTACH_TASK(_runtime_state.get());
VLOG_FILE << "report_profile(): instance_id=" <<
_runtime_state->fragment_instance_id();
@@ -487,8 +448,7 @@ void PlanFragmentExecutor::send_report(bool done) {
// but fragments still need to be cancelled (e.g. limit reached), the
coordinator will
// be waiting for a final report and profile.
_report_status_cb(status, _is_report_success ? profile() : nullptr,
- _is_report_success ? load_channel_profile() : nullptr,
done || !status.ok(),
- _dml_query_statistics());
+ _is_report_success ? load_channel_profile() : nullptr,
done || !status.ok());
}
void PlanFragmentExecutor::stop_report_thread() {
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 565c9047c4b..5c048753f8e 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -47,7 +47,6 @@ class DataSink;
class DescriptorTbl;
class ExecEnv;
class ObjectPool;
-class QueryStatistics;
namespace vectorized {
class Block;
@@ -81,8 +80,7 @@ public:
// functions like PrettyPrint() or to_thrift(), neither of which is const
// because they take locks.
using report_status_callback =
- std::function<void(const Status&, RuntimeProfile*,
RuntimeProfile*, bool,
- std::shared_ptr<QueryStatistics>)>;
+ std::function<void(const Status&, RuntimeProfile*,
RuntimeProfile*, bool)>;
// report_status_cb, if !empty(), is used to report the accumulated profile
// information periodically during execution (open() or get_next()).
@@ -198,12 +196,6 @@ private:
RuntimeProfile::Counter* _fragment_cpu_timer;
- // It is shared with BufferControlBlock and will be called in two different
- // threads. But their calls are all at different time, there is no problem
of
- // multithreaded access.
- std::shared_ptr<QueryStatistics> _query_statistics;
- bool _collect_query_statistics_with_every_batch;
-
// Record the cancel information when calling the cancel() method, return
it to FE
PPlanFragmentCancelReason _cancel_reason;
std::string _cancel_msg;
@@ -244,16 +236,7 @@ private:
const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl();
}
- void _collect_query_statistics();
-
- std::shared_ptr<QueryStatistics> _dml_query_statistics() {
- if (_query_statistics && _query_statistics->collect_dml_statistics()) {
- return _query_statistics;
- }
- return nullptr;
- }
-
- void _collect_node_statistics();
+ std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c6b996bcae9..8746483df4c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,6 +33,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
+#include "runtime/runtime_query_statistics_mgr.h"
#include "task_group/task_group.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
@@ -77,6 +78,11 @@ public:
if (_task_group) {
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
}
+ if (_exec_env &&
+ _exec_env
+ ->runtime_query_statistics_mgr()) { // for BE ut
FragmentMgrTest.Normal, Meaningless
+
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(query_id));
+ }
}
// Notice. For load fragments, the fragment_num sent by FE has a small
probability of 0.
@@ -174,6 +180,45 @@ public:
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
+ void register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
+
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id),
qs,
+
coord_addr);
+ }
+
+ std::shared_ptr<QueryStatistics> get_query_statistics() {
+ return
_exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
+ print_id(query_id));
+ }
+
+ void register_memory_statistics() {
+ if (query_mem_tracker) {
+ std::shared_ptr<QueryStatistics> qs =
query_mem_tracker->get_query_statistics();
+ std::string query_id_str = print_id(query_id);
+ if (qs) {
+ if (_exec_env &&
+ _exec_env->runtime_query_statistics_mgr()) { // for ut
FragmentMgrTest.normal
+
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+ query_id_str, qs, coord_addr);
+ }
+ } else {
+ LOG(INFO) << " query " << query_id_str << " get memory query
statistics failed ";
+ }
+ }
+ }
+
+ void register_cpu_statistics() {
+ if (!_cpu_statistics) {
+ _cpu_statistics = std::make_shared<QueryStatistics>();
+ if (_exec_env &&
+ _exec_env->runtime_query_statistics_mgr()) { // for ut
FragmentMgrTest.normal
+
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+ print_id(query_id), _cpu_statistics, coord_addr);
+ }
+ }
+ }
+
+ std::shared_ptr<QueryStatistics> get_cpu_statistics() { return
_cpu_statistics; }
+
public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
@@ -227,6 +272,8 @@ private:
taskgroup::TaskGroupPtr _task_group;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;
+
+ std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
};
} // namespace doris
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index 7630362f5eb..0a20f6d6e0a 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -22,6 +22,8 @@
#include <memory>
+#include "util/time.h"
+
namespace doris {
void NodeStatistics::merge(const NodeStatistics& other) {
@@ -40,7 +42,13 @@ void NodeStatistics::from_pb(const PNodeStatistics&
node_statistics) {
void QueryStatistics::merge(const QueryStatistics& other) {
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
- cpu_ms += other.cpu_ms;
+ cpu_nanos += other.cpu_nanos;
+
+ int64_t other_peak_mem = other.max_peak_memory_bytes.load();
+ if (other_peak_mem > this->max_peak_memory_bytes) {
+ this->max_peak_memory_bytes = other_peak_mem;
+ }
+
for (auto& other_node_statistics : other._nodes_statistics_map) {
int64_t node_id = other_node_statistics.first;
auto node_statistics = add_nodes_statistics(node_id);
@@ -52,7 +60,6 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
DCHECK(statistics != nullptr);
statistics->set_scan_rows(scan_rows);
statistics->set_scan_bytes(scan_bytes);
- statistics->set_cpu_ms(cpu_ms);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
for (auto iter = _nodes_statistics_map.begin(); iter !=
_nodes_statistics_map.end(); ++iter) {
@@ -64,17 +71,16 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
DCHECK(statistics != nullptr);
- statistics->scan_bytes = scan_bytes;
- statistics->scan_rows = scan_rows;
- statistics->cpu_ms = cpu_ms;
- statistics->returned_rows = returned_rows;
- statistics->max_peak_memory_bytes = max_peak_memory_bytes;
+ statistics->__set_scan_bytes(scan_bytes);
+ statistics->__set_scan_rows(scan_rows);
+ statistics->__set_cpu_ms(cpu_nanos.load() / NANOS_PER_MILLIS);
+ statistics->__set_returned_rows(returned_rows);
+ statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
- cpu_ms = statistics.cpu_ms();
for (auto& p_node_statistics : statistics.nodes_statistics()) {
int64_t node_id = p_node_statistics.node_id();
auto node_statistics = add_nodes_statistics(node_id);
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index 4923e727961..e9df0e338ab 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -59,13 +59,12 @@ private:
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting
it.
class QueryStatistics {
public:
- QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT)
+ QueryStatistics()
: scan_rows(0),
scan_bytes(0),
- cpu_ms(0),
+ cpu_nanos(0),
returned_rows(0),
- max_peak_memory_bytes(0),
- _query_type(query_type) {}
+ max_peak_memory_bytes(0) {}
virtual ~QueryStatistics();
void merge(const QueryStatistics& other);
@@ -74,7 +73,7 @@ public:
void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; }
- void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
+ void add_cpu_nanos(int64_t cpu_nanos) { this->cpu_nanos += cpu_nanos; }
NodeStatistics* add_nodes_statistics(int64_t node_id) {
NodeStatistics* nodeStatistics = nullptr;
@@ -103,9 +102,10 @@ public:
void clearNodeStatistics();
void clear() {
- scan_rows = 0;
- scan_bytes = 0;
- cpu_ms = 0;
+ scan_rows.store(0);
+ scan_bytes.store(0);
+
+ cpu_nanos = 0;
returned_rows = 0;
max_peak_memory_bytes = 0;
clearNodeStatistics();
@@ -119,25 +119,23 @@ public:
bool collected() const { return _collected; }
void set_collected() { _collected = true; }
- // LOAD does not need to collect information on the exchange node.
- bool collect_dml_statistics() { return _query_type == TQueryType::LOAD; }
+ int64_t get_scan_rows() { return scan_rows.load(); }
+ int64_t get_scan_bytes() { return scan_bytes.load(); }
private:
friend class QueryStatisticsRecvr;
- int64_t scan_rows;
- int64_t scan_bytes;
- int64_t cpu_ms;
+ std::atomic<int64_t> scan_rows;
+ std::atomic<int64_t> scan_bytes;
+ std::atomic<int64_t> cpu_nanos;
+
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
- // Maximum memory peak for all backends.
- // only set once by result sink when closing.
- int64_t max_peak_memory_bytes;
+ std::atomic<int64_t> max_peak_memory_bytes;
// The statistics of the query on each backend.
using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
bool _collected = false;
- const TQueryType::type _query_type;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
new file mode 100644
index 00000000000..f9ada4218df
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_query_statistics_mgr.h"
+
+#include <gen_cpp/HeartbeatService_types.h>
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "util/debug_util.h"
+#include "util/time.h"
+
+namespace doris {
+
+void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
+
std::shared_ptr<QueryStatistics> qs_ptr,
+ TNetworkAddress
fe_addr) {
+ std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ if (_query_statistics_ctx_map.find(query_id) ==
_query_statistics_ctx_map.end()) {
+ _query_statistics_ctx_map[query_id] =
std::make_unique<QueryStatisticsCtx>(fe_addr);
+ }
+ _query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr);
+}
+
+void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
+ int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+ // 1 get query statistics map
+ std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>>
fe_qs_map;
+ std::map<std::string, std::pair<bool, bool>> qs_status; // <finished,
timeout>
+ {
+ std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ int64_t current_time = MonotonicMillis();
+ int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
+ for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+ if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) {
+ std::map<std::string, TQueryStatistics> tmp_map;
+ fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map);
+ }
+
+ QueryStatistics tmp_qs;
+ for (auto& qs_ptr : qs_ctx_ptr->qs_list) {
+ tmp_qs.merge(*qs_ptr);
+ }
+ TQueryStatistics ret_t_qs;
+ tmp_qs.to_thrift(&ret_t_qs);
+ fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs;
+
+ bool is_query_finished = qs_ctx_ptr->is_query_finished;
+ bool is_timeout_after_finish = false;
+ if (is_query_finished) {
+ is_timeout_after_finish =
+ (current_time - qs_ctx_ptr->query_finish_time) >
conf_qs_timeout;
+ }
+ qs_status[query_id] = std::make_pair(is_query_finished,
is_timeout_after_finish);
+ }
+ }
+
+ // 2 report query statistics to fe
+ std::map<TNetworkAddress, bool> rpc_result;
+ for (auto& [addr, qs_map] : fe_qs_map) {
+ rpc_result[addr] = false;
+ // 2.1 get client
+ Status coord_status;
+ FrontendServiceConnection
coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
+ &coord_status);
+ std::stringstream ss;
+ addr.printTo(ss);
+ std::string add_str = ss.str();
+
+ if (!coord_status.ok()) {
+ std::stringstream ss;
+ LOG(WARNING) << "could not get client " << add_str
+ << " when report workload runtime stats, reason is "
+ << coord_status.to_string();
+ continue;
+ }
+
+ // 2.2 send report
+ TReportWorkloadRuntimeStatusParams report_runtime_params;
+ report_runtime_params.__set_backend_id(be_id);
+ report_runtime_params.__set_query_statistics_map(qs_map);
+
+ TReportExecStatusParams params;
+ params.report_workload_runtime_status = report_runtime_params;
+
+ TReportExecStatusResult res;
+ Status rpc_status;
+ try {
+ coord->reportExecStatus(res, params);
+ rpc_result[addr] = true;
+ } catch (apache::thrift::TApplicationException& e) {
+ LOG(WARNING) << "fe " << add_str
+ << " throw exception when report statistics, reason="
<< e.what()
+ << " , you can see fe log for details.";
+ } catch (apache::thrift::transport::TTransportException& e) {
+ LOG(WARNING) << "report workload runtime statistics to " << add_str
+ << " failed, err: " << e.what();
+ rpc_status = coord.reopen();
+ if (!rpc_status.ok()) {
+ LOG(WARNING)
+ << "reopen thrift client failed when report workload
runtime statistics to"
+ << add_str;
+ } else {
+ try {
+ coord->reportExecStatus(res, params);
+ rpc_result[addr] = true;
+ } catch (apache::thrift::transport::TTransportException& e2) {
+ LOG(WARNING) << "retry report workload runtime stats to "
<< add_str
+ << " failed, err: " << e2.what();
+ }
+ }
+ }
+ }
+
+ // 3 when query is finished and (last rpc is send success), remove
finished query statistics
+ if (fe_qs_map.size() == 0) {
+ return;
+ }
+
+ {
+ std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ for (auto& [addr, qs_map] : fe_qs_map) {
+ bool is_rpc_success = rpc_result[addr];
+ for (auto& [query_id, qs] : qs_map) {
+ auto& qs_status_pair = qs_status[query_id];
+ bool is_query_finished = qs_status_pair.first;
+ bool is_timeout_after_finish = qs_status_pair.second;
+ if ((is_rpc_success && is_query_finished) ||
is_timeout_after_finish) {
+ _query_statistics_ctx_map.erase(query_id);
+ }
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
+ // NOTE: here must be a write lock
+ std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+ // when a query get query_ctx succ, but failed before create node/operator,
+ // it may not register query statistics, so it can not be mark finish
+ if (_query_statistics_ctx_map.find(query_id) !=
_query_statistics_ctx_map.end()) {
+ auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
+ qs_ptr->is_query_finished = true;
+ qs_ptr->query_finish_time = MonotonicMillis();
+ }
+}
+
+std::shared_ptr<QueryStatistics>
RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
+ std::string query_id) {
+ std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+ if (_query_statistics_ctx_map.find(query_id) ==
_query_statistics_ctx_map.end()) {
+ return nullptr;
+ }
+ std::shared_ptr<QueryStatistics> qs_ptr =
std::make_shared<QueryStatistics>();
+ for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) {
+ qs_ptr->merge(*qs);
+ }
+ return qs_ptr;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
new file mode 100644
index 00000000000..c4e997d9ffb
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <shared_mutex>
+#include <string>
+
+#include "runtime/query_statistics.h"
+
+namespace doris {
+
+class QueryStatisticsCtx {
+public:
+ QueryStatisticsCtx(TNetworkAddress fe_addr) : fe_addr(fe_addr) {
+ this->is_query_finished = false;
+ }
+ ~QueryStatisticsCtx() = default;
+
+ std::vector<std::shared_ptr<QueryStatistics>> qs_list;
+ bool is_query_finished;
+ TNetworkAddress fe_addr;
+ int64_t query_finish_time;
+};
+
+class RuntimeQueryStatiticsMgr {
+public:
+ RuntimeQueryStatiticsMgr() = default;
+ ~RuntimeQueryStatiticsMgr() = default;
+
+ void register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
+ TNetworkAddress fe_addr);
+
+ void report_runtime_query_statistics();
+
+ void set_query_finished(std::string query_id);
+
+ std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string
query_id);
+
+private:
+ std::shared_mutex _qs_ctx_map_lock;
+ std::map<std::string, std::unique_ptr<QueryStatisticsCtx>>
_query_statistics_ctx_map;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 6bdc9aba1d0..d3033ee25d1 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -79,21 +79,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const
TPlanNode& tnode,
}
}
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
- RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
- if (!_is_pipeline_scan || _should_create_scanner) {
- statistics->add_scan_bytes(_byte_read_counter->value());
- statistics->add_scan_rows(_rows_read_counter->value());
- statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
- }
- return Status::OK();
-}
-
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics,
int) {
- RETURN_IF_ERROR(collect_query_statistics(statistics));
- return Status::OK();
-}
-
Status NewOlapScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
// if you want to add some profile in scan node, even it have not new
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index db424e17e59..4fd5992bb7e 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -63,8 +63,6 @@ public:
friend class doris::pipeline::OlapScanOperator;
Status prepare(RuntimeState* state) override;
- Status collect_query_statistics(QueryStatistics* statistics) override;
- Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges)
override;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index c21d4c3dc06..2c311207be7 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1297,6 +1297,9 @@ Status VScanNode::_prepare_scanners(const int
query_parallel_instance_num) {
if (scanners.empty()) {
_eos = true;
} else {
+ for (auto& scanner : scanners) {
+ scanner->set_query_statistics(_query_statistics.get());
+ }
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners,
query_parallel_instance_num));
}
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 2264eab9aa4..82e81bb7ae0 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -66,6 +66,8 @@ Status VScanner::get_block(RuntimeState* state, Block* block,
bool* eof) {
}
}
+ int64_t old_scan_rows = _num_rows_read;
+ int64_t old_scan_bytes = _num_byte_read;
{
do {
// if step 2 filter all rows of block, and block will be reused to
get next rows,
@@ -94,6 +96,11 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
_num_rows_read < rows_read_threshold);
}
+ if (_query_statistics) {
+ _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
+ _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
+ }
+
if (state->is_cancelled()) {
return Status::Cancelled("cancelled");
}
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 3dd57bedb51..7d4b36ff44d 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -32,6 +32,7 @@
namespace doris {
class RuntimeProfile;
class TupleDescriptor;
+class QueryStatistics;
namespace vectorized {
class VExprContext;
@@ -113,7 +114,11 @@ public:
int64_t get_scanner_wait_worker_timer() { return
_scanner_wait_worker_timer; }
- void update_scan_cpu_timer() { _scan_cpu_timer +=
_cpu_watch.elapsed_time(); }
+ void update_scan_cpu_timer() {
+ int64_t cpu_time = _cpu_watch.elapsed_time();
+ _scan_cpu_timer += cpu_time;
+ _query_statistics->add_cpu_nanos(cpu_time);
+ }
RuntimeState* runtime_state() { return _state; }
@@ -149,6 +154,10 @@ public:
return true;
}
+ void set_query_statistics(QueryStatistics* query_statistics) {
+ _query_statistics = query_statistics;
+ }
+
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
@@ -159,6 +168,7 @@ protected:
RuntimeState* _state;
VScanNode* _parent;
+ QueryStatistics* _query_statistics = nullptr;
// Set if scan node has sort limit info
int64_t _limit = -1;
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index dbd78a6033a..6b40af8e946 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -64,10 +64,9 @@ Status VExchangeNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
DCHECK_GT(_num_senders, 0);
- _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, _input_row_desc, state->fragment_instance_id(), _id,
_num_senders,
- _runtime_profile.get(), _is_merging,
_sub_plan_query_statistics_recvr);
+ _runtime_profile.get(), _is_merging);
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor,
_row_descriptor));
@@ -142,20 +141,6 @@ void VExchangeNode::release_resource(RuntimeState* state) {
ExecNode::release_resource(state);
}
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
- RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
- if (!statistics->collect_dml_statistics()) {
- statistics->merge(_sub_plan_query_statistics_recvr.get());
- }
- return Status::OK();
-}
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics,
int sender_id) {
- RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
- if (!statistics->collect_dml_statistics()) {
- statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
- }
- return Status::OK();
-}
Status VExchangeNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 94302e84d9b..e49eb86a92c 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -32,7 +32,6 @@ namespace doris {
class DorisNodesInfo;
class ObjectPool;
class QueryStatistics;
-class QueryStatisticsRecvr;
class RuntimeState;
class TPlanNode;
@@ -55,8 +54,6 @@ public:
Status open(RuntimeState* state) override;
Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
void release_resource(RuntimeState* state) override;
- Status collect_query_statistics(QueryStatistics* statistics) override;
- Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
Status close(RuntimeState* state) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }
@@ -67,7 +64,6 @@ private:
bool _is_ready;
std::shared_ptr<VDataStreamRecvr> _stream_recvr;
RowDescriptor _input_row_desc;
- std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
// use in merge sort
size_t _offset;
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 1307b0d874f..33dfe81a113 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -57,14 +57,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const
TUniqueId& fragment_instanc
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging,
- std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
{
+ PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging) {
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
- std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
- this, state, row_desc, fragment_instance_id, dest_node_id,
num_senders, is_merging,
- profile, sub_plan_query_statistics_recvr));
+ std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state,
row_desc,
+
fragment_instance_id, dest_node_id,
+ num_senders,
is_merging, profile));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
@@ -132,13 +131,6 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
return Status::OK();
}
- // request can only be used before calling recvr's add_batch or when
request
- // is the last for the sender, because request maybe released after it's
batch
- // is consumed by ExchangeNode.
- if (request->has_query_statistics()) {
- recvr->add_sub_plan_statistics(request->query_statistics(),
request->sender_id());
- }
-
bool eos = request->eos();
if (request->has_block()) {
RETURN_IF_ERROR(recvr->add_block(request->block(),
request->sender_id(),
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h
b/be/src/vec/runtime/vdata_stream_mgr.h
index ca0e7ab4b74..da0c59c10a9 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -39,7 +39,6 @@ namespace doris {
class RuntimeState;
class RowDescriptor;
class RuntimeProfile;
-class QueryStatisticsRecvr;
class PTransmitDataParams;
namespace vectorized {
@@ -50,11 +49,11 @@ public:
VDataStreamMgr();
~VDataStreamMgr();
- std::shared_ptr<VDataStreamRecvr> create_recvr(
- RuntimeState* state, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders,
- RuntimeProfile* profile, bool is_merging,
- std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
+ std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+ const RowDescriptor&
row_desc,
+ const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id,
int num_senders,
+ RuntimeProfile* profile,
bool is_merging);
std::shared_ptr<VDataStreamRecvr> find_recvr(const TUniqueId&
fragment_instance_id,
PlanNodeId node_id, bool
acquire_lock = true);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 064cb65e1c2..6e25e7160f8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -292,11 +292,10 @@ void VDataStreamRecvr::SenderQueue::close() {
_block_queue.clear();
}
-VDataStreamRecvr::VDataStreamRecvr(
- VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor&
row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
- bool is_merging, RuntimeProfile* profile,
- std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState*
state,
+ const RowDescriptor& row_desc,
+ const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
+ int num_senders, bool is_merging,
RuntimeProfile* profile)
: _mgr(stream_mgr),
#ifdef USE_MEM_TRACKER
_query_mem_tracker(state->query_mem_tracker()),
@@ -309,7 +308,6 @@ VDataStreamRecvr::VDataStreamRecvr(
_is_closed(false),
_profile(profile),
_peak_memory_usage_counter(nullptr),
- _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
_mem_tracker =
@@ -409,12 +407,9 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos)
{
}
}
-void VDataStreamRecvr::remove_sender(int sender_id, int be_number,
QueryStatisticsPtr statistics) {
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->decrement_senders(be_number);
- if (statistics != nullptr) {
- _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
- }
}
void VDataStreamRecvr::cancel_stream(const std::string& msg) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1694f7af516..b92aeadc8b1 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -54,7 +54,6 @@ namespace doris {
class MemTracker;
class PBlock;
class MemTrackerLimiter;
-class PQueryStatistics;
class RuntimeState;
namespace vectorized {
@@ -65,8 +64,7 @@ class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const
RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id,
- int num_senders, bool is_merging, RuntimeProfile* profile,
- std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
+ int num_senders, bool is_merging, RuntimeProfile*
profile);
virtual ~VDataStreamRecvr();
@@ -90,13 +88,9 @@ public:
PlanNodeId dest_node_id() const { return _dest_node_id; }
const RowDescriptor& row_desc() const { return _row_desc; }
- void add_sub_plan_statistics(const PQueryStatistics& statistics, int
sender_id) {
- _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
- }
-
// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
- void remove_sender(int sender_id, int be_number, QueryStatisticsPtr
statistics = nullptr);
+ void remove_sender(int sender_id, int be_number);
// We need msg to make sure we can pass existing regression test.
void cancel_stream(const std::string& msg = "");
@@ -170,8 +164,6 @@ private:
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter;
- std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
-
bool _enable_pipeline;
};
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 851ba8f2d96..e72879011cb 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -134,8 +134,7 @@ Status Channel::send_local_block(bool eos, const Status&
exec_status) {
COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
_local_recvr->add_block(&block, _parent->_sender_id, true);
if (eos) {
- _local_recvr->remove_sender(_parent->_sender_id, _be_number,
- _parent->query_statisticsPtr());
+ _local_recvr->remove_sender(_parent->_sender_id, _be_number);
}
return Status::OK();
} else {
@@ -176,10 +175,6 @@ Status Channel::send_remote_block(PBlock* block, bool eos,
const Status& exec_st
VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id << " to_host=" <<
_brpc_dest_addr.hostname
<< " _packet_seq=" << _packet_seq << " row_desc=" <<
_row_desc.debug_string();
- if (_is_transfer_chain && (_send_query_statistics_with_every_batch ||
eos)) {
- auto statistic = _brpc_request.mutable_query_statistics();
- _parent->_query_statistics->to_pb(statistic);
- }
_brpc_request.set_eos(eos);
@@ -292,8 +287,7 @@ Status Channel::close_internal(const Status& exec_status) {
if (!exec_status.ok()) {
_local_recvr->cancel_stream(exec_status.msg());
}
- _local_recvr->remove_sender(_parent->_sender_id, _be_number,
- _parent->query_statisticsPtr());
+ _local_recvr->remove_sender(_parent->_sender_id, _be_number);
}
} else {
status = send_remote_block((PBlock*)nullptr, true, exec_status);
@@ -327,8 +321,7 @@ void Channel::ch_roll_pb_block() {
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool,
int sender_id,
const RowDescriptor& row_desc, const
TDataStreamSink& sink,
const
std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size,
- bool
send_query_statistics_with_every_batch)
+ int per_channel_buffer_size)
: _sender_id(sender_id),
_pool(pool),
_row_desc(row_desc),
@@ -348,21 +341,17 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
_enable_pipeline_exec = state->enable_pipeline_exec();
for (int i = 0; i < destinations.size(); ++i) {
- // Select first dest as transfer chain.
- bool is_transfer_chain = (i == 0);
const auto& fragment_instance_id =
destinations[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
if (_enable_pipeline_exec) {
_channel_shared_ptrs.emplace_back(new PipChannel(
this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- sink.dest_node_id, per_channel_buffer_size,
is_transfer_chain,
- send_query_statistics_with_every_batch));
+ sink.dest_node_id, per_channel_buffer_size));
} else {
_channel_shared_ptrs.emplace_back(new Channel(
this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- sink.dest_node_id, per_channel_buffer_size,
is_transfer_chain,
- send_query_statistics_with_every_batch));
+ sink.dest_node_id, per_channel_buffer_size));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() -
1);
@@ -384,8 +373,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const
RowDescriptor& row_desc,
PlanNodeId dest_node_id,
const
std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size,
- bool
send_query_statistics_with_every_batch)
+ int per_channel_buffer_size)
: _sender_id(sender_id),
_pool(pool),
_row_desc(row_desc),
@@ -401,8 +389,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int
sender_id, const RowD
fragment_id_to_channel_index.end()) {
_channel_shared_ptrs.emplace_back(
new Channel(this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- _dest_node_id, per_channel_buffer_size, false,
- send_query_statistics_with_every_batch));
+ _dest_node_id, per_channel_buffer_size));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() - 1);
@@ -411,8 +398,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int
sender_id, const RowD
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor&
row_desc,
- int per_channel_buffer_size,
- bool
send_query_statistics_with_every_batch)
+ int per_channel_buffer_size)
: _sender_id(0),
_pool(pool),
_row_desc(row_desc),
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 3e164b4dd18..d9365c3f483 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -74,15 +74,14 @@ public:
VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink&
sink,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
+ int per_channel_buffer_size);
VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor&
row_desc,
PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
+ int per_channel_buffer_size);
- VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int
per_channel_buffer_size,
- bool send_query_statistics_with_every_batch);
+ VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int
per_channel_buffer_size);
~VDataStreamSender() override;
@@ -106,9 +105,6 @@ public:
const RowDescriptor& row_desc() { return _row_desc; }
- QueryStatistics* query_statistics() { return _query_statistics.get(); }
- QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
-
protected:
friend class Channel;
friend class PipChannel;
@@ -215,8 +211,7 @@ public:
// when data is added via add_row() and not sent directly via send_batch().
Channel(VDataStreamSender* parent, const RowDescriptor& row_desc,
const TNetworkAddress& brpc_dest, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
- bool send_query_statistics_with_every_batch)
+ PlanNodeId dest_node_id, int buffer_size)
: _parent(parent),
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
@@ -225,9 +220,7 @@ public:
_packet_seq(0),
_need_close(false),
_closed(false),
- _brpc_dest_addr(brpc_dest),
- _is_transfer_chain(is_transfer_chain),
-
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch)
{
+ _brpc_dest_addr(brpc_dest) {
std::string localhost = BackendOptions::get_localhost();
_is_local = (_brpc_dest_addr.hostname == localhost) &&
(_brpc_dest_addr.port == config::brpc_port);
@@ -367,9 +360,6 @@ protected:
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
Status _receiver_status;
int32_t _brpc_timeout_ms = 500;
- // whether the dest can be treated as query statistics transfer chain.
- bool _is_transfer_chain;
- bool _send_query_statistics_with_every_batch;
RuntimeState* _state;
bool _is_local;
@@ -417,10 +407,9 @@ class PipChannel final : public Channel {
public:
PipChannel(VDataStreamSender* parent, const RowDescriptor& row_desc,
const TNetworkAddress& brpc_dest, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int buffer_size, bool
is_transfer_chain,
- bool send_query_statistics_with_every_batch)
- : Channel(parent, row_desc, brpc_dest, fragment_instance_id,
dest_node_id, buffer_size,
- is_transfer_chain,
send_query_statistics_with_every_batch) {
+ PlanNodeId dest_node_id, int buffer_size)
+ : Channel(parent, row_desc, brpc_dest, fragment_instance_id,
dest_node_id,
+ buffer_size) {
ch_roll_pb_block();
}
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index c5f4c0358e8..fb0ec06d282 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -39,7 +39,6 @@
#include "vec/runtime/vfile_result_writer.h"
namespace doris {
-class QueryStatistics;
class TExpr;
} // namespace doris
@@ -47,7 +46,6 @@ namespace doris::vectorized {
VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor&
row_desc,
const TResultFileSink& sink, int
per_channel_buffer_size,
- bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr)
: _t_output_expr(t_output_expr), _row_desc(row_desc) {
CHECK(sink.__isset.file_options);
@@ -66,7 +64,6 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int
sender_id, const RowDescr
const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
int per_channel_buffer_size,
- bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr,
DescriptorTbl& descs)
: _t_output_expr(t_output_expr),
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
@@ -78,8 +75,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int
sender_id, const RowDescr
_is_top_sink = false;
CHECK_EQ(destinations.size(), 1);
_stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc,
sink.dest_node_id,
- destinations,
per_channel_buffer_size,
-
send_query_statistics_with_every_batch));
+ destinations,
per_channel_buffer_size));
_name = "VResultFileSink";
//for impl csv_with_name and csv_with_names_and_types
@@ -168,7 +164,7 @@ Status VResultFileSink::close(RuntimeState* state, Status
exec_status) {
if (_is_top_sink) {
// close sender, this is normal path end
if (_sender) {
- _sender->update_num_written_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
+ _sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(
@@ -189,12 +185,4 @@ Status VResultFileSink::close(RuntimeState* state, Status
exec_status) {
return Status::OK();
}
-void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
- if (_is_top_sink) {
- _sender->set_query_statistics(statistics);
- } else {
- _stream_sender->set_query_statistics(statistics);
- }
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h
b/be/src/vec/sink/vresult_file_sink.h
index 90bc06bb422..44f7c9bfb5c 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -33,7 +33,6 @@
namespace doris {
class BufferControlBlock;
class ObjectPool;
-class QueryStatistics;
class RuntimeProfile;
class RuntimeState;
class TDataSink;
@@ -47,13 +46,12 @@ class VExprContext;
class VResultFileSink : public DataSink {
public:
VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const
TResultFileSink& sink,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
- const std::vector<TExpr>& t_output_expr);
+ int per_channel_buffer_size, const std::vector<TExpr>&
t_output_expr);
VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor&
row_desc,
const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
- const std::vector<TExpr>& t_output_expr, DescriptorTbl&
descs);
+ int per_channel_buffer_size, const std::vector<TExpr>&
t_output_expr,
+ DescriptorTbl& descs);
~VResultFileSink() override = default;
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
@@ -66,7 +64,6 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
RuntimeProfile* profile() override { return _profile; }
- void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override;
const RowDescriptor& row_desc() const { return _row_desc; }
private:
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index b14b5b0eebb..4c731199d06 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -40,7 +40,6 @@
#include "vec/sink/vmysql_result_writer.h"
namespace doris {
-class QueryStatistics;
class RowDescriptor;
class TExpr;
@@ -153,8 +152,7 @@ Status VResultSink::close(RuntimeState* state, Status
exec_status) {
// close sender, this is normal path end
if (_sender) {
- if (_writer)
_sender->update_num_written_rows(_writer->get_written_rows());
- _sender->update_max_peak_memory_bytes();
+ if (_writer) _sender->update_return_rows(_writer->get_written_rows());
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(
@@ -163,9 +161,5 @@ Status VResultSink::close(RuntimeState* state, Status
exec_status) {
return DataSink::close(state, exec_status);
}
-void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
- _sender->set_query_statistics(statistics);
-}
-
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index 19cfb3e3b42..b144bd0d22b 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -34,7 +34,6 @@ namespace doris {
class RuntimeState;
class RuntimeProfile;
class BufferControlBlock;
-class QueryStatistics;
class ResultWriter;
class RowDescriptor;
class TExpr;
@@ -137,8 +136,6 @@ public:
virtual Status close(RuntimeState* state, Status exec_status) override;
virtual RuntimeProfile* profile() override { return _profile; }
- void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override;
-
const RowDescriptor& row_desc() { return _row_desc; }
private:
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp
b/be/test/vec/runtime/vdata_stream_test.cpp
index 954bf8f194c..0f6dcf18e65 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -165,9 +165,8 @@ TEST_F(VDataStreamTest, BasicTest) {
int num_senders = 1;
RuntimeProfile profile("profile");
bool is_merge = false;
- std::shared_ptr<QueryStatisticsRecvr> statistics =
std::make_shared<QueryStatisticsRecvr>();
auto recv = _instance.create_recvr(&runtime_stat, row_desc, uid, nid,
num_senders, &profile,
- is_merge, statistics);
+ is_merge);
// Test Sender
int sender_id = 1;
@@ -189,11 +188,8 @@ TEST_F(VDataStreamTest, BasicTest) {
dests.push_back(dest);
}
int per_channel_buffer_size = 1024 * 1024;
- bool send_query_statistics_with_every_batch = false;
VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id,
row_desc, tsink.stream_sink,
- dests, per_channel_buffer_size,
- send_query_statistics_with_every_batch);
- sender.set_query_statistics(std::make_shared<QueryStatistics>());
+ dests, per_channel_buffer_size);
sender.init(tsink);
sender.prepare(&runtime_stat);
sender.open(&runtime_stat);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2da574eb844..eacefe71549 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2257,6 +2257,16 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int publish_topic_info_interval_ms = 30000; // 30s
+ @ConfField(mutable = true)
+ public static int workload_runtime_status_thread_interval_ms = 2000;
+
+ // NOTE: it should bigger than be config
report_query_statistics_interval_ms
+ @ConfField(mutable = true)
+ public static int query_audit_log_timeout_ms = 5000;
+
+ @ConfField(mutable = true)
+ public static int be_report_query_statistics_timeout_ms = 60000;
+
@ConfField(masterOnly = true, description = {
"设置 root 用户初始化2阶段 SHA-1 加密密码,默认为'',即不设置 root 密码。"
+ "后续 root 用户的 `set password` 操作会将 root 初始化密码覆盖。"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1fe294b5a6b..67aceac41a2 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -210,6 +210,7 @@ import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.StatisticsAutoCollector;
@@ -455,6 +456,8 @@ public class Env {
private WorkloadGroupMgr workloadGroupMgr;
+ private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
+
private QueryStats queryStats;
private StatisticsCleaner statisticsCleaner;
@@ -678,6 +681,7 @@ public class Env {
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
+ this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
this.queryStats = new QueryStats();
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -755,6 +759,10 @@ public class Env {
return workloadGroupMgr;
}
+ public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() {
+ return workloadRuntimeStatusMgr;
+ }
+
// use this to get correct ClusterInfoService instance
public static SystemInfoService getCurrentSystemInfo() {
return getCurrentEnv().getClusterInfo();
@@ -922,6 +930,8 @@ public class Env {
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}
+
+ workloadRuntimeStatusMgr.start();
}
// wait until FE is ready.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index dbef8321b54..723767971e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -100,6 +100,8 @@ public class AuditEvent {
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
+ public long pushToAuditLogQueueTime;
+
public static class AuditEventBuilder {
private AuditEvent auditEvent = new AuditEvent();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index d914b012884..dcfedc26792 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -120,6 +120,6 @@ public class AuditLogHelper {
}
}
}
-
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index e5e56c63520..3db88e87f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -356,7 +356,7 @@ public class ConnectProcessor {
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
}
}
-
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+ AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, null, true);
}
// Process COM_QUERY statement,
@@ -439,7 +439,8 @@ public class ConnectProcessor {
finalizeCommand();
}
}
- auditAfterExec(auditStmt, executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog(), true);
+ auditAfterExec(auditStmt, executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog(),
+ true);
// execute failed, skip remaining stmts
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index ada53b54bcd..1cbe41d9107 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -184,6 +184,15 @@ public final class QeProcessorImpl implements QeProcessor {
LOG.debug("params: {}", params);
}
final TReportExecStatusResult result = new TReportExecStatusResult();
+
+ if (params.isSetReportWorkloadRuntimeStatus()) {
+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status);
+ if (!params.isSetQueryId()) {
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+ }
+
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
new file mode 100644
index 00000000000..623c3c9aaa9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadschedpolicy;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkloadRuntimeStatusMgr {
+
+ private static final Logger LOG =
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
+ private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap =
Maps.newConcurrentMap();
+ private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
+ private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+ private final ReentrantReadWriteLock queryAuditEventLock = new
ReentrantReadWriteLock();
+ private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
+
+ class WorkloadRuntimeStatsThread extends Daemon {
+
+ WorkloadRuntimeStatusMgr workloadStatsMgr;
+
+ public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr
workloadRuntimeStatusMgr, String threadName,
+ int interval) {
+ super(threadName, interval);
+ this.workloadStatsMgr = workloadRuntimeStatusMgr;
+ }
+
+ @Override
+ protected void runOneCycle() {
+ // 1 merge be query statistics
+ Map<String, TQueryStatistics> queryStatisticsMap =
workloadStatsMgr.getQueryStatisticsMap();
+
+ // 2 log query audit
+ List<AuditEvent> auditEventList =
workloadStatsMgr.getQueryNeedAudit();
+ for (AuditEvent auditEvent : auditEventList) {
+ TQueryStatistics queryStats =
queryStatisticsMap.get(auditEvent.queryId);
+ if (queryStats != null) {
+ auditEvent.scanRows = queryStats.scan_rows;
+ auditEvent.scanBytes = queryStats.scan_bytes;
+ auditEvent.peakMemoryBytes =
queryStats.max_peak_memory_bytes;
+ auditEvent.cpuTimeMs = queryStats.cpu_ms;
+ }
+
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
+ }
+
+ // 3 clear beToQueryStatsMap when be report timeout
+ workloadStatsMgr.clearReportTimeoutBeStatistics();
+ }
+
+ }
+
+ private Daemon thread = null;
+
+ public void submitFinishQueryToAudit(AuditEvent event) {
+ queryAuditEventLogWriteLock();
+ try {
+ event.pushToAuditLogQueueTime = System.currentTimeMillis();
+ queryAuditEventList.add(event);
+ } finally {
+ queryAuditEventLogWriteUnlock();
+ }
+ }
+
+ public List<AuditEvent> getQueryNeedAudit() {
+ List<AuditEvent> ret = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ queryAuditEventLogWriteLock();
+ try {
+ int queryAuditLogTimeout = Config.query_audit_log_timeout_ms;
+ Iterator<AuditEvent> iter = queryAuditEventList.iterator();
+ while (iter.hasNext()) {
+ AuditEvent ae = iter.next();
+ if (currentTime - ae.pushToAuditLogQueueTime >
queryAuditLogTimeout) {
+ ret.add(ae);
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ queryAuditEventLogWriteUnlock();
+ }
+ return ret;
+ }
+
+ public void start() {
+ thread = new WorkloadRuntimeStatsThread(this,
"workload-runtime-stats-thread",
+ Config.workload_runtime_status_thread_interval_ms);
+ thread.start();
+ }
+
+ public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
+ if (!params.isSetBackendId()) {
+ LOG.warn("be report workload runtime status but without beid");
+ return;
+ }
+ if (!params.isSetQueryStatisticsMap()) {
+ LOG.warn("be report workload runtime status but without query
stats map");
+ return;
+ }
+ long beId = params.backend_id;
+ Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
+ beLastReportTime.put(beId, System.currentTimeMillis());
+ if (queryIdMap == null) {
+ queryIdMap = Maps.newConcurrentMap();
+ queryIdMap.putAll(params.query_statistics_map);
+ beToQueryStatsMap.put(beId, queryIdMap);
+ } else {
+ long currentTime = System.currentTimeMillis();
+ for (Map.Entry<String, TQueryStatistics> entry :
params.query_statistics_map.entrySet()) {
+ queryIdMap.put(entry.getKey(), entry.getValue());
+ queryLastReportTime.put(entry.getKey(), currentTime);
+ }
+ }
+ }
+
+ public Map<String, TQueryStatistics> getQueryStatisticsMap() {
+ // 1 merge query stats in all be
+ Set<Long> beIdSet = beToQueryStatsMap.keySet();
+ Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+ for (Long beId : beIdSet) {
+ Map<String, TQueryStatistics> currentQueryMap =
beToQueryStatsMap.get(beId);
+ Set<String> queryIdSet = currentQueryMap.keySet();
+ for (String queryId : queryIdSet) {
+ TQueryStatistics retQuery = retQueryMap.get(queryId);
+ if (retQuery == null) {
+ retQuery = new TQueryStatistics();
+ retQueryMap.put(queryId, retQuery);
+ }
+
+ TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
+ mergeQueryStatistics(retQuery, curQueryStats);
+ }
+ }
+
+ return retQueryMap;
+ }
+
+ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics
src) {
+ dst.scan_rows += src.scan_rows;
+ dst.scan_bytes += src.scan_bytes;
+ dst.cpu_ms += src.cpu_ms;
+ if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
+ dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
+ }
+ }
+
+ void clearReportTimeoutBeStatistics() {
+ // 1 clear report timeout be
+ Set<Long> beNeedToRemove = new HashSet<>();
+ Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+ Long currentTime = System.currentTimeMillis();
+ for (Long beId : currentBeIdSet) {
+ Long lastReportTime = beLastReportTime.get(beId);
+ if (lastReportTime != null
+ && currentTime - lastReportTime >
Config.be_report_query_statistics_timeout_ms) {
+ beNeedToRemove.add(beId);
+ }
+ }
+ for (Long beId : beNeedToRemove) {
+ beToQueryStatsMap.remove(beId);
+ beLastReportTime.remove(beId);
+ }
+
+ // 2 clear report timeout query
+ Set<String> queryNeedToClear = new HashSet<>();
+ Long newCurrentTime = System.currentTimeMillis();
+ Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
+ for (String queryId : queryLastReportTimeKeySet) {
+ Long lastReportTime = queryLastReportTime.get(queryId);
+ if (lastReportTime != null
+ && newCurrentTime - lastReportTime >
Config.be_report_query_statistics_timeout_ms) {
+ queryNeedToClear.add(queryId);
+ }
+ }
+
+ Set<Long> beIdSet = beToQueryStatsMap.keySet();
+ for (String queryId : queryNeedToClear) {
+ for (Long beId : beIdSet) {
+ beToQueryStatsMap.get(beId).remove(queryId);
+ }
+ queryLastReportTime.remove(queryId);
+ }
+ }
+
+ private void queryAuditEventLogWriteLock() {
+ queryAuditEventLock.writeLock().lock();
+ }
+
+ private void queryAuditEventLogWriteUnlock() {
+ queryAuditEventLock.writeLock().unlock();
+ }
+}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index bdaa4ea28e9..910a3763028 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -406,6 +406,11 @@ struct TQueryStatistics {
5: optional i64 max_peak_memory_bytes
}
+struct TReportWorkloadRuntimeStatusParams {
+ 1: optional i64 backend_id
+ 2: map<string, TQueryStatistics> query_statistics_map
+}
+
// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
@@ -467,6 +472,8 @@ struct TReportExecStatusParams {
23: optional list<TDetailedReportParams> detailed_report
24: optional TQueryStatistics query_statistics
+
+ 25: TReportWorkloadRuntimeStatusParams report_workload_runtime_status
}
struct TFeResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]