This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 940fc4e2528 [profile](runtime filter) Add necessary metrics in runtime
filter (#44367)
940fc4e2528 is described below
commit 940fc4e25285234093b78d652394f0303ca493b7
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 21 14:42:55 2024 +0800
[profile](runtime filter) Add necessary metrics in runtime filter (#44367)
Currently, we lost wait time and local merge time for each runtime
filter. This PR complete some necessary metrics.
---
be/src/exprs/runtime_filter.cpp | 61 ++++++++++------------
be/src/exprs/runtime_filter.h | 9 ++--
.../local_exchange_sink_operator.cpp | 3 ++
.../local_exchange/local_exchange_sink_operator.h | 2 +
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/runtime_filter_mgr.cpp | 30 ++---------
be/src/runtime/runtime_filter_mgr.h | 6 +--
gensrc/proto/internal_service.proto | 2 +
8 files changed, 50 insertions(+), 67 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2ca6493dc31..24333360ff6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1001,20 +1001,20 @@ void IRuntimeFilter::insert_batch(const
vectorized::ColumnPtr column, size_t sta
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
DCHECK(is_producer());
- auto send_to_remote_targets = [&](IRuntimeFilter* filter) {
+ auto send_to_remote_targets = [&](IRuntimeFilter* filter, uint64_t
local_merge_time) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
- return filter->push_to_remote(state, &addr);
+ return filter->push_to_remote(state, &addr, local_merge_time);
};
- auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper>
wrapper,
- bool global) {
+ auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper>
wrapper, bool global,
+ uint64_t local_merge_time = 0) {
std::vector<std::shared_ptr<IRuntimeFilter>> filters =
global ?
_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id)
:
_state->local_runtime_filter_mgr()->get_consume_filters(_filter_id);
for (auto filter : filters) {
filter->_wrapper = wrapper;
- filter->update_runtime_filter_type_to_profile();
+ filter->update_runtime_filter_type_to_profile(local_merge_time);
filter->signal();
}
return Status::OK();
@@ -1024,15 +1024,20 @@ Status IRuntimeFilter::publish(RuntimeState* state,
bool publish_local) {
LocalMergeFilters* local_merge_filters = nullptr;
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_filter_id, &local_merge_filters));
+ local_merge_filters->merge_watcher.start();
std::lock_guard l(*local_merge_filters->lock);
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
local_merge_filters->merge_time--;
+ local_merge_filters->merge_watcher.stop();
if (local_merge_filters->merge_time == 0) {
if (_has_local_target) {
- RETURN_IF_ERROR(
-
send_to_local_targets(local_merge_filters->filters[0]->_wrapper, true));
+ RETURN_IF_ERROR(send_to_local_targets(
+ local_merge_filters->filters[0]->_wrapper, true,
+
local_merge_filters->merge_watcher.elapsed_time()));
} else {
-
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
+ RETURN_IF_ERROR(send_to_remote_targets(
+ local_merge_filters->filters[0].get(),
+
local_merge_filters->merge_watcher.elapsed_time()));
}
}
}
@@ -1046,7 +1051,7 @@ Status IRuntimeFilter::publish(RuntimeState* state, bool
publish_local) {
RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
} else if (!publish_local) {
if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() <
USE_NEW_SERDE) {
- RETURN_IF_ERROR(send_to_remote_targets(this));
+ RETURN_IF_ERROR(send_to_remote_targets(this, 0));
} else {
RETURN_IF_ERROR(do_merge());
}
@@ -1176,7 +1181,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
return Status::OK();
}
-Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr,
+ uint64_t local_merge_time) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(*addr));
@@ -1205,6 +1211,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState*
state, const TNetworkAddress
pfragment_instance_id->set_lo((int64_t)this);
merge_filter_request->set_filter_id(_filter_id);
+ merge_filter_request->set_local_merge_time(local_merge_time);
auto column_type = _wrapper->column_type();
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
@@ -1244,9 +1251,9 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
}
_profile->add_info_string("Info", formatted_state());
// The runtime filter is pushed down, adding filtering information.
- auto* expr_filtered_rows_counter = ADD_COUNTER(_profile,
"expr_filtered_rows", TUnit::UNIT);
- auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows",
TUnit::UNIT);
- auto* always_true_counter = ADD_COUNTER(_profile, "always_true_pass_rows",
TUnit::UNIT);
+ auto* expr_filtered_rows_counter = ADD_COUNTER(_profile,
"ExprFilteredRows", TUnit::UNIT);
+ auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows",
TUnit::UNIT);
+ auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows",
TUnit::UNIT);
for (auto i = origin_size; i < push_exprs.size(); i++) {
push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter,
expr_input_rows_counter,
always_true_counter);
@@ -1266,6 +1273,7 @@ void IRuntimeFilter::update_state() {
// In pipelineX, runtime filters will be ready or timeout before open
phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+ COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}
@@ -1284,6 +1292,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
+ COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
@@ -1523,11 +1532,14 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", formatted_state());
+ _wait_timer = ADD_TIMER(_profile, "WaitTime");
}
}
-void IRuntimeFilter::update_runtime_filter_type_to_profile() {
+void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t
local_merge_time) {
_profile->add_info_string("RealRuntimeFilterType",
to_string(_wrapper->get_real_type()));
+ _profile->add_info_string("LocalMergeTime",
+ std::to_string(local_merge_time / 1000000000.0)
+ " s");
}
std::string IRuntimeFilter::debug_string() const {
@@ -1864,24 +1876,9 @@ bool IRuntimeFilter::need_sync_filter_size() {
_wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
}
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
- _profile->add_info_string("MergeTime",
std::to_string(param->request->merge_time()) + " ms");
-
- if (param->request->has_ignored() && param->request->ignored()) {
- set_ignored();
- } else {
- std::unique_ptr<RuntimePredicateWrapper> wrapper;
- RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, &wrapper));
- RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
- update_runtime_filter_type_to_profile();
- }
- this->signal();
-
- return Status::OK();
-}
-
void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper>
wrapper,
- int64_t merge_time, int64_t start_apply) {
+ int64_t merge_time, int64_t start_apply,
+ uint64_t local_merge_time) {
_profile->add_info_string("UpdateTime",
std::to_string(MonotonicMillis() - start_apply)
+ " ms");
_profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms");
@@ -1891,7 +1888,7 @@ void
IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrap
wrapper->_column_return_type = _wrapper->_column_return_type;
}
_wrapper = wrapper;
- update_runtime_filter_type_to_profile();
+ update_runtime_filter_type_to_profile(local_merge_time);
signal();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 9d9021a747b..50ee52865be 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -278,9 +278,8 @@ public:
std::shared_ptr<RuntimePredicateWrapper>*
wrapper);
Status change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
- Status update_filter(const UpdateRuntimeFilterParams* param);
void update_filter(std::shared_ptr<RuntimePredicateWrapper>
filter_wrapper, int64_t merge_time,
- int64_t start_apply);
+ int64_t start_apply, uint64_t local_merge_time);
void set_ignored();
@@ -291,13 +290,14 @@ public:
bool need_sync_filter_size();
// async push runtimefilter to remote node
- Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
+ Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
+ uint64_t local_merge_time);
void init_profile(RuntimeProfile* parent_profile);
std::string debug_string() const;
- void update_runtime_filter_type_to_profile();
+ void update_runtime_filter_type_to_profile(uint64_t local_merge_time);
int filter_id() const { return _filter_id; }
@@ -415,6 +415,7 @@ protected:
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
+ RuntimeProfile::Counter* _wait_timer = nullptr;
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 0369cf75834..22007a4b220 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -91,6 +91,9 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
+ _profile->add_info_string(
+ "PartitionExprsSize",
+
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num));
_channel_id = info.task_idx;
return Status::OK();
}
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 4c4a400c2bd..435f7a410a4 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -91,6 +91,7 @@ public:
: Base(sink_id, dest_id, dest_id),
_num_partitions(num_partitions),
_texprs(texprs),
+ _partitioned_exprs_num(texprs.size()),
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
@@ -114,6 +115,7 @@ private:
ExchangeType _type;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
+ const size_t _partitioned_exprs_num;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c4f633d84aa..5a8ea2377aa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1245,7 +1245,9 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms,
&filter_wrapper));
std::ranges::for_each(filters, [&](auto& filter) {
- filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
+ filter->update_filter(
+ filter_wrapper, request->merge_time(), start_apply,
+ request->has_local_merge_time() ?
request->local_merge_time() : 0);
});
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 70ce8add789..bb100fcbb42 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -201,33 +201,6 @@ Status RuntimeFilterMgr::register_producer_filter(const
TRuntimeFilterDesc& desc
return Status::OK();
}
-Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
- butil::IOBufAsZeroCopyInputStream*
data) {
- SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
- UpdateRuntimeFilterParams params(request, data);
- int filter_id = request->filter_id();
- std::vector<std::shared_ptr<IRuntimeFilter>> filters;
- // The code is organized for upgrade compatibility to prevent infinite
waiting
- // old way update filter the code should be deleted after the upgrade is
complete.
- {
- std::lock_guard<std::mutex> l(_lock);
- auto iter = _consumer_map.find(filter_id);
- if (iter == _consumer_map.end()) {
- return Status::InternalError("update_filter meet unknown filter:
{}, role: CONSUMER.",
- filter_id);
- }
- for (auto& holder : iter->second) {
- filters.emplace_back(holder.filter);
- }
- iter->second.clear();
- }
- for (auto filter : filters) {
- RETURN_IF_ERROR(filter->update_filter(¶ms));
- }
-
- return Status::OK();
-}
-
void RuntimeFilterMgr::set_runtime_filter_params(
const TRuntimeFilterParams& runtime_filter_params) {
std::lock_guard l(_lock);
@@ -442,6 +415,8 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
DCHECK_LE(merged_size, cnt_val->producer_size);
cnt_val->merge_time += (MonotonicMillis() - start_merge);
merge_time = cnt_val->merge_time;
+ cnt_val->local_merge_time +=
+ request->has_local_merge_time() ? request->local_merge_time()
: 0;
}
if (merged_size == cnt_val->producer_size) {
@@ -481,6 +456,7 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
closure->request_->set_filter_id(request->filter_id());
closure->request_->set_merge_time(merge_time);
+ closure->request_->set_local_merge_time(cnt_val->local_merge_time);
*closure->request_->mutable_query_id() = request->query_id();
if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 83c526c31a9..0a6f8318fea 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -34,6 +34,7 @@
#include "common/object_pool.h"
#include "common/status.h"
+#include "util/stopwatch.hpp"
#include "util/uid_util.h"
namespace butil {
@@ -60,6 +61,7 @@ struct LocalMergeFilters {
int merge_size_times = 0;
uint64_t local_merged_size = 0;
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
+ MonotonicStopWatch merge_watcher;
};
/// producer:
@@ -113,9 +115,6 @@ public:
bool build_bf_exactly = false);
// update filter by remote
- Status update_filter(const PPublishFilterRequest* request,
- butil::IOBufAsZeroCopyInputStream* data);
-
void set_runtime_filter_params(const TRuntimeFilterParams&
runtime_filter_params);
Status get_merge_addr(TNetworkAddress* addr);
@@ -189,6 +188,7 @@ public:
std::unordered_set<UniqueId> arrive_id;
std::vector<PNetworkAddress> source_addrs;
std::shared_ptr<ObjectPool> pool;
+ uint64_t local_merge_time = 0;
};
private:
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 6a933945933..8db8bab16cd 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -594,6 +594,7 @@ message PMergeFilterRequest {
optional PColumnType column_type = 10;
optional bool contain_null = 11;
optional bool ignored = 12;
+ optional uint64 local_merge_time = 13;
};
message PMergeFilterResponse {
@@ -629,6 +630,7 @@ message PPublishFilterRequestV2 {
optional bool contain_null = 10;
optional bool ignored = 11;
repeated int32 fragment_ids = 12;
+ optional uint64 local_merge_time = 13;
};
message PPublishFilterResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]