This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a5a2a56114d [branch-3.0](pick) pick #44169 #44367 (#44394) a5a2a56114d is described below commit a5a2a56114df85a8d74c5084c16b93d99da4824a Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Tue Nov 26 10:20:07 2024 +0800 [branch-3.0](pick) pick #44169 #44367 (#44394) pick #44169 #44367 --- be/src/exprs/runtime_filter.cpp | 69 +++++++++++----------- be/src/exprs/runtime_filter.h | 8 +-- .../local_exchange_sink_operator.cpp | 3 + .../local_exchange/local_exchange_sink_operator.h | 2 + be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/runtime_filter_mgr.cpp | 30 +--------- be/src/runtime/runtime_filter_mgr.h | 6 +- gensrc/proto/internal_service.proto | 2 + 8 files changed, 53 insertions(+), 71 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f8f6b001982..53c0cc45e40 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -994,20 +994,20 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta Status IRuntimeFilter::publish(bool publish_local) { DCHECK(is_producer()); - auto send_to_remote_targets = [&](IRuntimeFilter* filter) { + auto send_to_remote_targets = [&](IRuntimeFilter* filter, uint64_t local_merge_time) { TNetworkAddress addr; DCHECK(_state != nullptr); RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr)); - return filter->push_to_remote(&addr); + return filter->push_to_remote(&addr, local_merge_time); }; - auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper, - bool global) { + auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper, bool global, + uint64_t local_merge_time = 0) { std::vector<std::shared_ptr<IRuntimeFilter>> filters = global ? _state->global_runtime_filter_mgr()->get_consume_filters(_filter_id) : _state->local_runtime_filter_mgr()->get_consume_filters(_filter_id); for (auto filter : filters) { filter->_wrapper = wrapper; - filter->update_runtime_filter_type_to_profile(); + filter->update_runtime_filter_type_to_profile(local_merge_time); filter->signal(); } return Status::OK(); @@ -1017,15 +1017,20 @@ Status IRuntimeFilter::publish(bool publish_local) { LocalMergeFilters* local_merge_filters = nullptr; RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( _filter_id, &local_merge_filters)); + local_merge_filters->merge_watcher.start(); std::lock_guard l(*local_merge_filters->lock); RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get())); local_merge_filters->merge_time--; + local_merge_filters->merge_watcher.stop(); if (local_merge_filters->merge_time == 0) { if (_has_local_target) { - RETURN_IF_ERROR( - send_to_local_targets(local_merge_filters->filters[0]->_wrapper, true)); + RETURN_IF_ERROR(send_to_local_targets( + local_merge_filters->filters[0]->_wrapper, true, + local_merge_filters->merge_watcher.elapsed_time())); } else { - RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get())); + RETURN_IF_ERROR(send_to_remote_targets( + local_merge_filters->filters[0].get(), + local_merge_filters->merge_watcher.elapsed_time())); } } } @@ -1039,7 +1044,7 @@ Status IRuntimeFilter::publish(bool publish_local) { RETURN_IF_ERROR(send_to_local_targets(_wrapper, false)); } else if (!publish_local) { if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) { - RETURN_IF_ERROR(send_to_remote_targets(this)); + RETURN_IF_ERROR(send_to_remote_targets(this, 0)); } else { RETURN_IF_ERROR(do_merge()); } @@ -1161,7 +1166,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt return Status::OK(); } -Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { +Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, uint64_t local_merge_time) { DCHECK(is_producer()); std::shared_ptr<PBackendService_Stub> stub( _state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(*addr)); @@ -1188,6 +1193,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { merge_filter_request->set_filter_id(_filter_id); merge_filter_request->set_is_pipeline(true); + merge_filter_request->set_local_merge_time(local_merge_time); auto column_type = _wrapper->column_type(); RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); merge_filter_callback->cntl_->set_timeout_ms(_state->get_query_ctx()->execution_timeout()); @@ -1222,9 +1228,9 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr } _profile->add_info_string("Info", formatted_state()); // The runtime filter is pushed down, adding filtering information. - auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT); - auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT); - auto* always_true_counter = ADD_COUNTER(_profile, "always_true", TUnit::UNIT); + auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "ExprFilteredRows", TUnit::UNIT); + auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows", TUnit::UNIT); + auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows", TUnit::UNIT); for (auto i = origin_size; i < push_exprs.size(); i++) { push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, always_true_counter); @@ -1244,6 +1250,7 @@ void IRuntimeFilter::update_state() { // In pipelineX, runtime filters will be ready or timeout before open phase. if (expected == RuntimeFilterState::NOT_READY) { DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); + COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); _rf_state_atomic = RuntimeFilterState::TIME_OUT; } } @@ -1262,6 +1269,7 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); + COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); _rf_state_atomic.store(RuntimeFilterState::READY); if (!_filter_timer.empty()) { for (auto& timer : _filter_timer) { @@ -1340,18 +1348,19 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size ? options->runtime_bloom_filter_max_size : 0; + auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size; // We build runtime filter by exact distinct count iff three conditions are met: // 1. Only 1 join key - // 2. Do not have remote target (e.g. do not need to merge), or broadcast join - // 3. Bloom filter + // 2. Bloom filter + // 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join). params.build_bf_exactly = build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; - if (!desc->__isset.sync_filter_size || !desc->sync_filter_size) { - params.build_bf_exactly &= (!_has_remote_target || _is_broadcast_join); + if (!sync_filter_size) { + params.build_bf_exactly &= !_is_broadcast_join; } if (desc->__isset.bloom_filter_size_bytes) { @@ -1499,11 +1508,14 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { _profile_init = true; parent_profile->add_child(_profile.get(), true, nullptr); _profile->add_info_string("Info", formatted_state()); + _wait_timer = ADD_TIMER(_profile, "WaitTime"); } } -void IRuntimeFilter::update_runtime_filter_type_to_profile() { +void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) { _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); + _profile->add_info_string("LocalMergeTime", + std::to_string(local_merge_time / 1000000000.0) + " s"); } std::string IRuntimeFilter::debug_string() const { @@ -1840,24 +1852,9 @@ bool IRuntimeFilter::need_sync_filter_size() { _wrapper->get_build_bf_cardinality() && !_is_broadcast_join; } -Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { - _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); - - if (param->request->has_ignored() && param->request->ignored()) { - set_ignored(); - } else { - std::unique_ptr<RuntimePredicateWrapper> wrapper; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, &wrapper)); - RETURN_IF_ERROR(_wrapper->merge(wrapper.get())); - update_runtime_filter_type_to_profile(); - } - this->signal(); - - return Status::OK(); -} - void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrapper, - int64_t merge_time, int64_t start_apply) { + int64_t merge_time, int64_t start_apply, + uint64_t local_merge_time) { _profile->add_info_string("UpdateTime", std::to_string(MonotonicMillis() - start_apply) + " ms"); _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms"); @@ -1867,7 +1864,7 @@ void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrap wrapper->_column_return_type = _wrapper->_column_return_type; } _wrapper = wrapper; - update_runtime_filter_type_to_profile(); + update_runtime_filter_type_to_profile(local_merge_time); signal(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 584d3d4e535..aa18e93ff6b 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -277,9 +277,8 @@ public: std::shared_ptr<RuntimePredicateWrapper>* wrapper); Status change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); - Status update_filter(const UpdateRuntimeFilterParams* param); void update_filter(std::shared_ptr<RuntimePredicateWrapper> filter_wrapper, int64_t merge_time, - int64_t start_apply); + int64_t start_apply, uint64_t local_merge_time); void set_ignored(); @@ -290,13 +289,13 @@ public: bool need_sync_filter_size(); // async push runtimefilter to remote node - Status push_to_remote(const TNetworkAddress* addr); + Status push_to_remote(const TNetworkAddress* addr, uint64_t local_merge_time); void init_profile(RuntimeProfile* parent_profile); std::string debug_string() const; - void update_runtime_filter_type_to_profile(); + void update_runtime_filter_type_to_profile(uint64_t local_merge_time); int filter_id() const { return _filter_id; } @@ -413,6 +412,7 @@ protected: // parent profile // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; + RuntimeProfile::Counter* _wait_timer = nullptr; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index a939d25654b..00fa7f5ae79 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -90,6 +90,9 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo "UseGlobalShuffle", std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle)); } + _profile->add_info_string( + "PartitionExprsSize", + std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num)); _channel_id = info.task_idx; return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 4c4a400c2bd..435f7a410a4 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -91,6 +91,7 @@ public: : Base(sink_id, dest_id, dest_id), _num_partitions(num_partitions), _texprs(texprs), + _partitioned_exprs_num(texprs.size()), _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { @@ -114,6 +115,7 @@ private: ExchangeType _type; const int _num_partitions; const std::vector<TExpr>& _texprs; + const size_t _partitioned_exprs_num; std::unique_ptr<vectorized::PartitionerBase> _partitioner; const std::map<int, int> _bucket_seq_to_instance_idx; std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 71b6304d428..d3fe8aec1f4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1214,7 +1214,9 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, &filter_wrapper)); std::ranges::for_each(filters, [&](auto& filter) { - filter->update_filter(filter_wrapper, request->merge_time(), start_apply); + filter->update_filter( + filter_wrapper, request->merge_time(), start_apply, + request->has_local_merge_time() ? request->local_merge_time() : 0); }); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b11e8290d96..42eacf3a924 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -200,33 +200,6 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc return Status::OK(); } -Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, - butil::IOBufAsZeroCopyInputStream* data) { - SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); - UpdateRuntimeFilterParams params(request, data); - int filter_id = request->filter_id(); - std::vector<std::shared_ptr<IRuntimeFilter>> filters; - // The code is organized for upgrade compatibility to prevent infinite waiting - // old way update filter the code should be deleted after the upgrade is complete. - { - std::lock_guard<std::mutex> l(_lock); - auto iter = _consumer_map.find(filter_id); - if (iter == _consumer_map.end()) { - return Status::InternalError("update_filter meet unknown filter: {}, role: CONSUMER.", - filter_id); - } - for (auto& holder : iter->second) { - filters.emplace_back(holder.filter); - } - iter->second.clear(); - } - for (auto filter : filters) { - RETURN_IF_ERROR(filter->update_filter(¶ms)); - } - - return Status::OK(); -} - void RuntimeFilterMgr::set_runtime_filter_params( const TRuntimeFilterParams& runtime_filter_params) { std::lock_guard l(_lock); @@ -434,6 +407,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ DCHECK_LE(merged_size, cnt_val->producer_size); cnt_val->merge_time += (MonotonicMillis() - start_merge); merge_time = cnt_val->merge_time; + cnt_val->local_merge_time += + request->has_local_merge_time() ? request->local_merge_time() : 0; } if (merged_size == cnt_val->producer_size) { @@ -473,6 +448,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ closure->request_->set_is_pipeline(request->has_is_pipeline() && request->is_pipeline()); closure->request_->set_merge_time(merge_time); + closure->request_->set_local_merge_time(cnt_val->local_merge_time); *closure->request_->mutable_query_id() = request->query_id(); if (has_attachment) { closure->cntl_->request_attachment().append(request_attachment); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index dce051ab0d6..8d9aa2e557a 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -34,6 +34,7 @@ #include "common/object_pool.h" #include "common/status.h" +#include "util/stopwatch.hpp" #include "util/uid_util.h" namespace butil { @@ -60,6 +61,7 @@ struct LocalMergeFilters { int merge_size_times = 0; uint64_t local_merged_size = 0; std::vector<std::shared_ptr<IRuntimeFilter>> filters; + MonotonicStopWatch merge_watcher; }; /// producer: @@ -113,9 +115,6 @@ public: bool build_bf_exactly = false); // update filter by remote - Status update_filter(const PPublishFilterRequest* request, - butil::IOBufAsZeroCopyInputStream* data); - void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); Status get_merge_addr(TNetworkAddress* addr); @@ -188,6 +187,7 @@ public: std::unordered_set<UniqueId> arrive_id; std::vector<PNetworkAddress> source_addrs; std::shared_ptr<ObjectPool> pool; + uint64_t local_merge_time = 0; }; private: diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index f3764cea233..29868e5ff12 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -579,6 +579,7 @@ message PMergeFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional uint64 local_merge_time = 13; }; message PMergeFilterResponse { @@ -614,6 +615,7 @@ message PPublishFilterRequestV2 { optional bool contain_null = 10; optional bool ignored = 11; repeated int32 fragment_ids = 12; + optional uint64 local_merge_time = 13; }; message PPublishFilterResponse { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org