This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a5a2a56114d [branch-3.0](pick) pick #44169 #44367 (#44394)
a5a2a56114d is described below

commit a5a2a56114df85a8d74c5084c16b93d99da4824a
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Tue Nov 26 10:20:07 2024 +0800

    [branch-3.0](pick) pick #44169 #44367 (#44394)
    
    pick #44169 #44367
---
 be/src/exprs/runtime_filter.cpp                    | 69 +++++++++++-----------
 be/src/exprs/runtime_filter.h                      |  8 +--
 .../local_exchange_sink_operator.cpp               |  3 +
 .../local_exchange/local_exchange_sink_operator.h  |  2 +
 be/src/runtime/fragment_mgr.cpp                    |  4 +-
 be/src/runtime/runtime_filter_mgr.cpp              | 30 +---------
 be/src/runtime/runtime_filter_mgr.h                |  6 +-
 gensrc/proto/internal_service.proto                |  2 +
 8 files changed, 53 insertions(+), 71 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f8f6b001982..53c0cc45e40 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -994,20 +994,20 @@ void IRuntimeFilter::insert_batch(const 
vectorized::ColumnPtr column, size_t sta
 Status IRuntimeFilter::publish(bool publish_local) {
     DCHECK(is_producer());
 
-    auto send_to_remote_targets = [&](IRuntimeFilter* filter) {
+    auto send_to_remote_targets = [&](IRuntimeFilter* filter, uint64_t 
local_merge_time) {
         TNetworkAddress addr;
         DCHECK(_state != nullptr);
         
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
-        return filter->push_to_remote(&addr);
+        return filter->push_to_remote(&addr, local_merge_time);
     };
-    auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> 
wrapper,
-                                     bool global) {
+    auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> 
wrapper, bool global,
+                                     uint64_t local_merge_time = 0) {
         std::vector<std::shared_ptr<IRuntimeFilter>> filters =
                 global ? 
_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id)
                        : 
_state->local_runtime_filter_mgr()->get_consume_filters(_filter_id);
         for (auto filter : filters) {
             filter->_wrapper = wrapper;
-            filter->update_runtime_filter_type_to_profile();
+            filter->update_runtime_filter_type_to_profile(local_merge_time);
             filter->signal();
         }
         return Status::OK();
@@ -1017,15 +1017,20 @@ Status IRuntimeFilter::publish(bool publish_local) {
             LocalMergeFilters* local_merge_filters = nullptr;
             
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
                     _filter_id, &local_merge_filters));
+            local_merge_filters->merge_watcher.start();
             std::lock_guard l(*local_merge_filters->lock);
             
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
             local_merge_filters->merge_time--;
+            local_merge_filters->merge_watcher.stop();
             if (local_merge_filters->merge_time == 0) {
                 if (_has_local_target) {
-                    RETURN_IF_ERROR(
-                            
send_to_local_targets(local_merge_filters->filters[0]->_wrapper, true));
+                    RETURN_IF_ERROR(send_to_local_targets(
+                            local_merge_filters->filters[0]->_wrapper, true,
+                            
local_merge_filters->merge_watcher.elapsed_time()));
                 } else {
-                    
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
+                    RETURN_IF_ERROR(send_to_remote_targets(
+                            local_merge_filters->filters[0].get(),
+                            
local_merge_filters->merge_watcher.elapsed_time()));
                 }
             }
         }
@@ -1039,7 +1044,7 @@ Status IRuntimeFilter::publish(bool publish_local) {
         RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
     } else if (!publish_local) {
         if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < 
USE_NEW_SERDE) {
-            RETURN_IF_ERROR(send_to_remote_targets(this));
+            RETURN_IF_ERROR(send_to_remote_targets(this, 0));
         } else {
             RETURN_IF_ERROR(do_merge());
         }
@@ -1161,7 +1166,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
     return Status::OK();
 }
 
-Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, uint64_t 
local_merge_time) {
     DCHECK(is_producer());
     std::shared_ptr<PBackendService_Stub> stub(
             
_state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(*addr));
@@ -1188,6 +1193,7 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr) {
 
     merge_filter_request->set_filter_id(_filter_id);
     merge_filter_request->set_is_pipeline(true);
+    merge_filter_request->set_local_merge_time(local_merge_time);
     auto column_type = _wrapper->column_type();
     
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
     
merge_filter_callback->cntl_->set_timeout_ms(_state->get_query_ctx()->execution_timeout());
@@ -1222,9 +1228,9 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
     }
     _profile->add_info_string("Info", formatted_state());
     // The runtime filter is pushed down, adding filtering information.
-    auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, 
"expr_filtered_rows", TUnit::UNIT);
-    auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", 
TUnit::UNIT);
-    auto* always_true_counter = ADD_COUNTER(_profile, "always_true", 
TUnit::UNIT);
+    auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, 
"ExprFilteredRows", TUnit::UNIT);
+    auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows", 
TUnit::UNIT);
+    auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows", 
TUnit::UNIT);
     for (auto i = origin_size; i < push_exprs.size(); i++) {
         push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, 
expr_input_rows_counter,
                                               always_true_counter);
@@ -1244,6 +1250,7 @@ void IRuntimeFilter::update_state() {
     // In pipelineX, runtime filters will be ready or timeout before open 
phase.
     if (expected == RuntimeFilterState::NOT_READY) {
         DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+        COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
         _rf_state_atomic = RuntimeFilterState::TIME_OUT;
     }
 }
@@ -1262,6 +1269,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
+    COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
     _rf_state_atomic.store(RuntimeFilterState::READY);
     if (!_filter_timer.empty()) {
         for (auto& timer : _filter_timer) {
@@ -1340,18 +1348,19 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     params.runtime_bloom_filter_max_size = 
options->__isset.runtime_bloom_filter_max_size
                                                    ? 
options->runtime_bloom_filter_max_size
                                                    : 0;
+    auto sync_filter_size = desc->__isset.sync_filter_size && 
desc->sync_filter_size;
     // We build runtime filter by exact distinct count iff three conditions 
are met:
     // 1. Only 1 join key
-    // 2. Do not have remote target (e.g. do not need to merge), or broadcast 
join
-    // 3. Bloom filter
+    // 2. Bloom filter
+    // 3. Size of all bloom filters will be same (size will be sync or this is 
a broadcast join).
     params.build_bf_exactly =
             build_bf_exactly && (_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER ||
                                  _runtime_filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
 
     params.bloom_filter_size_calculated_by_ndv = 
desc->bloom_filter_size_calculated_by_ndv;
 
-    if (!desc->__isset.sync_filter_size || !desc->sync_filter_size) {
-        params.build_bf_exactly &= (!_has_remote_target || _is_broadcast_join);
+    if (!sync_filter_size) {
+        params.build_bf_exactly &= !_is_broadcast_join;
     }
 
     if (desc->__isset.bloom_filter_size_bytes) {
@@ -1499,11 +1508,14 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
         _profile_init = true;
         parent_profile->add_child(_profile.get(), true, nullptr);
         _profile->add_info_string("Info", formatted_state());
+        _wait_timer = ADD_TIMER(_profile, "WaitTime");
     }
 }
 
-void IRuntimeFilter::update_runtime_filter_type_to_profile() {
+void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t 
local_merge_time) {
     _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
+    _profile->add_info_string("LocalMergeTime",
+                              std::to_string(local_merge_time / 1000000000.0) 
+ " s");
 }
 
 std::string IRuntimeFilter::debug_string() const {
@@ -1840,24 +1852,9 @@ bool IRuntimeFilter::need_sync_filter_size() {
            _wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
 }
 
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
-    _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
-
-    if (param->request->has_ignored() && param->request->ignored()) {
-        set_ignored();
-    } else {
-        std::unique_ptr<RuntimePredicateWrapper> wrapper;
-        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, &wrapper));
-        RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
-        update_runtime_filter_type_to_profile();
-    }
-    this->signal();
-
-    return Status::OK();
-}
-
 void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> 
wrapper,
-                                   int64_t merge_time, int64_t start_apply) {
+                                   int64_t merge_time, int64_t start_apply,
+                                   uint64_t local_merge_time) {
     _profile->add_info_string("UpdateTime",
                               std::to_string(MonotonicMillis() - start_apply) 
+ " ms");
     _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms");
@@ -1867,7 +1864,7 @@ void 
IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrap
         wrapper->_column_return_type = _wrapper->_column_return_type;
     }
     _wrapper = wrapper;
-    update_runtime_filter_type_to_profile();
+    update_runtime_filter_type_to_profile(local_merge_time);
     signal();
 }
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 584d3d4e535..aa18e93ff6b 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -277,9 +277,8 @@ public:
                                  std::shared_ptr<RuntimePredicateWrapper>* 
wrapper);
     Status change_to_bloom_filter();
     Status init_bloom_filter(const size_t build_bf_cardinality);
-    Status update_filter(const UpdateRuntimeFilterParams* param);
     void update_filter(std::shared_ptr<RuntimePredicateWrapper> 
filter_wrapper, int64_t merge_time,
-                       int64_t start_apply);
+                       int64_t start_apply, uint64_t local_merge_time);
 
     void set_ignored();
 
@@ -290,13 +289,13 @@ public:
     bool need_sync_filter_size();
 
     // async push runtimefilter to remote node
-    Status push_to_remote(const TNetworkAddress* addr);
+    Status push_to_remote(const TNetworkAddress* addr, uint64_t 
local_merge_time);
 
     void init_profile(RuntimeProfile* parent_profile);
 
     std::string debug_string() const;
 
-    void update_runtime_filter_type_to_profile();
+    void update_runtime_filter_type_to_profile(uint64_t local_merge_time);
 
     int filter_id() const { return _filter_id; }
 
@@ -413,6 +412,7 @@ protected:
     // parent profile
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
+    RuntimeProfile::Counter* _wait_timer = nullptr;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index a939d25654b..00fa7f5ae79 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -90,6 +90,9 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
                 "UseGlobalShuffle",
                 
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
     }
+    _profile->add_info_string(
+            "PartitionExprsSize",
+            
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num));
     _channel_id = info.task_idx;
     return Status::OK();
 }
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 4c4a400c2bd..435f7a410a4 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -91,6 +91,7 @@ public:
             : Base(sink_id, dest_id, dest_id),
               _num_partitions(num_partitions),
               _texprs(texprs),
+              _partitioned_exprs_num(texprs.size()),
               _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
@@ -114,6 +115,7 @@ private:
     ExchangeType _type;
     const int _num_partitions;
     const std::vector<TExpr>& _texprs;
+    const size_t _partitioned_exprs_num;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
     std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 71b6304d428..d3fe8aec1f4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1214,7 +1214,9 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
 
         std::ranges::for_each(filters, [&](auto& filter) {
-            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+            filter->update_filter(
+                    filter_wrapper, request->merge_time(), start_apply,
+                    request->has_local_merge_time() ? 
request->local_merge_time() : 0);
         });
     }
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index b11e8290d96..42eacf3a924 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -200,33 +200,6 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
-                                       butil::IOBufAsZeroCopyInputStream* 
data) {
-    SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
-    UpdateRuntimeFilterParams params(request, data);
-    int filter_id = request->filter_id();
-    std::vector<std::shared_ptr<IRuntimeFilter>> filters;
-    // The code is organized for upgrade compatibility to prevent infinite 
waiting
-    // old way update filter the code should be deleted after the upgrade is 
complete.
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        auto iter = _consumer_map.find(filter_id);
-        if (iter == _consumer_map.end()) {
-            return Status::InternalError("update_filter meet unknown filter: 
{}, role: CONSUMER.",
-                                         filter_id);
-        }
-        for (auto& holder : iter->second) {
-            filters.emplace_back(holder.filter);
-        }
-        iter->second.clear();
-    }
-    for (auto filter : filters) {
-        RETURN_IF_ERROR(filter->update_filter(&params));
-    }
-
-    return Status::OK();
-}
-
 void RuntimeFilterMgr::set_runtime_filter_params(
         const TRuntimeFilterParams& runtime_filter_params) {
     std::lock_guard l(_lock);
@@ -434,6 +407,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
         DCHECK_LE(merged_size, cnt_val->producer_size);
         cnt_val->merge_time += (MonotonicMillis() - start_merge);
         merge_time = cnt_val->merge_time;
+        cnt_val->local_merge_time +=
+                request->has_local_merge_time() ? request->local_merge_time() 
: 0;
     }
 
     if (merged_size == cnt_val->producer_size) {
@@ -473,6 +448,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             closure->request_->set_is_pipeline(request->has_is_pipeline() &&
                                                request->is_pipeline());
             closure->request_->set_merge_time(merge_time);
+            closure->request_->set_local_merge_time(cnt_val->local_merge_time);
             *closure->request_->mutable_query_id() = request->query_id();
             if (has_attachment) {
                 
closure->cntl_->request_attachment().append(request_attachment);
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index dce051ab0d6..8d9aa2e557a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -34,6 +34,7 @@
 
 #include "common/object_pool.h"
 #include "common/status.h"
+#include "util/stopwatch.hpp"
 #include "util/uid_util.h"
 
 namespace butil {
@@ -60,6 +61,7 @@ struct LocalMergeFilters {
     int merge_size_times = 0;
     uint64_t local_merged_size = 0;
     std::vector<std::shared_ptr<IRuntimeFilter>> filters;
+    MonotonicStopWatch merge_watcher;
 };
 
 /// producer:
@@ -113,9 +115,6 @@ public:
                                     bool build_bf_exactly = false);
 
     // update filter by remote
-    Status update_filter(const PPublishFilterRequest* request,
-                         butil::IOBufAsZeroCopyInputStream* data);
-
     void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
 
     Status get_merge_addr(TNetworkAddress* addr);
@@ -188,6 +187,7 @@ public:
         std::unordered_set<UniqueId> arrive_id;
         std::vector<PNetworkAddress> source_addrs;
         std::shared_ptr<ObjectPool> pool;
+        uint64_t local_merge_time = 0;
     };
 
 private:
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index f3764cea233..29868e5ff12 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -579,6 +579,7 @@ message PMergeFilterRequest {
     optional PColumnType column_type = 10;
     optional bool contain_null = 11;
     optional bool ignored = 12;
+    optional uint64 local_merge_time = 13;
 };
 
 message PMergeFilterResponse {
@@ -614,6 +615,7 @@ message PPublishFilterRequestV2 {
     optional bool contain_null = 10;
     optional bool ignored = 11;
     repeated int32 fragment_ids = 12;
+    optional uint64 local_merge_time = 13;
 };
 
 message PPublishFilterResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to