This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aabcab9dbe [Improvement](runtime filter) Improve merge phase (#18828)
aabcab9dbe is described below

commit aabcab9dbe8a461ba121aee742d78449e4b20b9f
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 26 21:01:20 2023 +0800

    [Improvement](runtime filter) Improve merge phase (#18828)
---
 be/src/exprs/bloom_filter_func.h                   |   2 +
 be/src/exprs/runtime_filter.cpp                    | 235 +++++++++++-----
 be/src/exprs/runtime_filter.h                      |  83 ++++--
 be/src/exprs/runtime_filter_rpc.cpp                |   4 +-
 be/src/olap/bloom_filter_predicate.h               |   3 +-
 be/src/runtime/fragment_mgr.cpp                    |  61 ++++-
 be/src/runtime/fragment_mgr.h                      |   3 +
 be/src/runtime/query_context.h                     |  34 ++-
 be/src/runtime/runtime_filter_mgr.cpp              | 299 ++++++++++++++++-----
 be/src/runtime/runtime_filter_mgr.h                |  19 +-
 be/src/service/internal_service.cpp                |  24 ++
 be/src/service/internal_service.h                  |   4 +
 be/src/vec/exec/join/vhash_join_node.cpp           |   3 +-
 be/src/vec/exec/join/vhash_join_node.h             |   1 +
 be/src/vec/exec/scan/vscan_node.cpp                |  31 ++-
 .../org/apache/doris/planner/RuntimeFilter.java    |   5 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  43 ++-
 gensrc/proto/internal_service.proto                |  14 +
 gensrc/thrift/PaloInternalService.thrift           |   8 +
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 20 files changed, 688 insertions(+), 190 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 11831ccdc0..6182bfe781 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -204,6 +204,8 @@ public:
         return Status::OK();
     }
 
+    size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 
0; }
+
     void light_copy(BloomFilterFuncBase* bloomfilter_func) {
         auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
         _bloom_filter_alloced = other_func->_bloom_filter_alloced;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e2ec2d57a2..2a9d7cc24f 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -348,28 +348,52 @@ public:
     RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
                             const RuntimeFilterParams* params)
             : _state(state),
+              _be_exec_version(_state->be_exec_version()),
               _pool(pool),
               _column_return_type(params->column_return_type),
               _filter_type(params->filter_type),
-              _fragment_instance_id(params->fragment_instance_id),
               _filter_id(params->filter_id),
-              
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
-                                                          
_column_return_type)),
-              _use_new_hash(_state->be_exec_version() >= 2) {}
+              _use_batch(
+                      IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)),
+              _use_new_hash(_be_exec_version >= 2) {}
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
     RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, 
PrimitiveType column_type,
-                            RuntimeFilterType type, UniqueId 
fragment_instance_id,
-                            uint32_t filter_id)
+                            RuntimeFilterType type, uint32_t filter_id)
             : _state(state),
+              _be_exec_version(_state->be_exec_version()),
               _pool(pool),
               _column_return_type(column_type),
               _filter_type(type),
-              _fragment_instance_id(fragment_instance_id),
               _filter_id(filter_id),
-              
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
-                                                          
_column_return_type)),
-              _use_new_hash(_state->be_exec_version() >= 2) {}
+              _use_batch(
+                      IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)),
+              _use_new_hash(_be_exec_version >= 2) {}
+
+    RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
+                            const RuntimeFilterParams* params)
+            : _query_ctx(query_ctx),
+              _be_exec_version(_query_ctx->be_exec_version()),
+              _pool(pool),
+              _column_return_type(params->column_return_type),
+              _filter_type(params->filter_type),
+              _filter_id(params->filter_id),
+              _use_batch(
+                      IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)),
+              _use_new_hash(_be_exec_version >= 2) {}
+    // for a 'tmp' runtime predicate wrapper
+    // only could called assign method or as a param for merge
+    RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, 
PrimitiveType column_type,
+                            RuntimeFilterType type, uint32_t filter_id)
+            : _query_ctx(query_ctx),
+              _be_exec_version(_query_ctx->be_exec_version()),
+              _pool(pool),
+              _column_return_type(column_type),
+              _filter_type(type),
+              _filter_id(filter_id),
+              _use_batch(
+                      IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)),
+              _use_new_hash(_be_exec_version >= 2) {}
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
     Status init(const RuntimeFilterParams* params) {
@@ -542,8 +566,7 @@ public:
     void insert_batch(const vectorized::ColumnPtr column, const 
std::vector<int>& rows) {
         if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
             bitmap_filter_insert_batch(column, rows);
-        } else if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
-                                                    _column_return_type)) {
+        } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)) {
             insert_fixed_len(column->get_raw_data().data, rows.data(), 
rows.size());
         } else {
             for (int index : rows) {
@@ -571,7 +594,14 @@ public:
         return real_filter_type;
     }
 
-    Status get_push_vexprs(std::vector<vectorized::VExpr*>* container, 
RuntimeState* state,
+    size_t get_bloom_filter_size() {
+        if (_is_bloomfilter) {
+            return _context.bloom_filter_func->get_size();
+        }
+        return 0;
+    }
+
+    Status get_push_vexprs(std::vector<vectorized::VExpr*>* container,
                            vectorized::VExprContext* prob_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
@@ -583,7 +613,6 @@ public:
                                    _filter_type != wrapper->_filter_type;
 
         CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
-                << "fragment instance " << _fragment_instance_id.to_string()
                 << " can not merge runtime filter(id=" << _filter_id
                 << "), current is filter type is " << to_string(_filter_type)
                 << ", other filter type is " << 
to_string(wrapper->_filter_type);
@@ -593,8 +622,7 @@ public:
             if (_is_ignored_in_filter) {
                 break;
             } else if (wrapper->_is_ignored_in_filter) {
-                VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                           << " ignore merge runtime filter(in filter id " << 
_filter_id
+                VLOG_DEBUG << " ignore merge runtime filter(in filter id " << 
_filter_id
                            << ") because: " << 
*(wrapper->get_ignored_in_filter_msg());
 
                 _is_ignored_in_filter = true;
@@ -608,8 +636,7 @@ public:
             if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
 #ifdef VLOG_DEBUG_IS_ON
                 std::stringstream msg;
-                msg << "fragment instance " << 
_fragment_instance_id.to_string()
-                    << " ignore merge runtime filter(in filter id " << 
_filter_id
+                msg << " ignore merge runtime filter(in filter id " << 
_filter_id
                     << ") because: in_num(" << _context.hybrid_set->size() << 
") >= max_in_num("
                     << _max_in_num << ")";
                 _ignored_in_filter_msg = _pool->add(new 
std::string(msg.str()));
@@ -637,22 +664,19 @@ public:
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
                 if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// in merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
-                            << "fragment instance " << 
_fragment_instance_id.to_string()
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
                             << *(wrapper->get_ignored_in_filter_msg());
                     
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
                     if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
-                        VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                                   << " change runtime filter to bloom 
filter(id=" << _filter_id
+                        VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
                                    << ") because: in_num(" << 
_context.hybrid_set->size()
                                    << ") >= max_in_num(" << _max_in_num << ")";
                         change_to_bloom_filter();
                     }
                     // in merge bloom filter
                 } else {
-                    VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                               << " change runtime filter to bloom filter(id=" 
<< _filter_id
+                    VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
                                << ") because: already exist a bloom filter";
                     change_to_bloom_filter();
                     
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
@@ -661,7 +685,6 @@ public:
                 if (wrapper->_filter_type ==
                     RuntimeFilterType::IN_FILTER) { // bloom filter merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
-                            << "fragment instance " << 
_fragment_instance_id.to_string()
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
                             << *(wrapper->get_ignored_in_filter_msg());
@@ -1036,6 +1059,8 @@ public:
 
 private:
     RuntimeState* _state;
+    QueryContext* _query_ctx;
+    int _be_exec_version;
     ObjectPool* _pool;
 
     // When a runtime filter received from remote and it is a bloom filter, 
_column_return_type will be invalid.
@@ -1047,7 +1072,6 @@ private:
     bool _is_bloomfilter = false;
     bool _is_ignored_in_filter = false;
     std::string* _ignored_in_filter_msg = nullptr;
-    UniqueId _fragment_instance_id;
     uint32_t _filter_id;
 
     // When _column_return_type is invalid, _use_batch will be always false.
@@ -1063,9 +1087,16 @@ Status IRuntimeFilter::create(RuntimeState* state, 
ObjectPool* pool, const TRunt
                               int node_id, IRuntimeFilter** res, bool 
build_bf_exactly) {
     *res = pool->add(new IRuntimeFilter(state, pool));
     (*res)->set_role(role);
-    UniqueId fragment_instance_id(state->fragment_instance_id());
-    return (*res)->init_with_desc(desc, query_options, fragment_instance_id, 
node_id,
-                                  build_bf_exactly);
+    return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
+}
+
+Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool,
+                              const TRuntimeFilterDesc* desc, const 
TQueryOptions* query_options,
+                              const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
+                              bool build_bf_exactly) {
+    *res = pool->add(new IRuntimeFilter(query_ctx, pool));
+    (*res)->set_role(role);
+    return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
 }
 
 void 
IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& 
context) {
@@ -1100,6 +1131,7 @@ Status IRuntimeFilter::publish() {
     if (_has_local_target) {
         IRuntimeFilter* consumer_filter = nullptr;
         // TODO: log if err
+        DCHECK(_state != nullptr);
         RETURN_IF_ERROR(
                 _state->runtime_filter_mgr()->get_consume_filter(_filter_id, 
&consumer_filter));
         // push down
@@ -1109,8 +1141,9 @@ Status IRuntimeFilter::publish() {
         return Status::OK();
     } else {
         TNetworkAddress addr;
+        DCHECK(_state != nullptr);
         RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
-        return push_to_remote(_state, &addr);
+        return push_to_remote(_state, &addr, _opt_remote_rf);
     }
 }
 
@@ -1124,7 +1157,7 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
     if (!_is_ignored) {
         _set_push_down();
         _profile->add_info_string("Info", _format_status());
-        return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx);
+        return _wrapper->get_push_vexprs(push_vexprs, _vprobe_ctx);
     } else {
         _profile->add_info_string("Info", _format_status());
         return Status::OK();
@@ -1132,32 +1165,35 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
 }
 
 Status IRuntimeFilter::get_prepared_vexprs(std::vector<vectorized::VExpr*>* 
vexprs,
-                                           const RowDescriptor& desc) {
+                                           const RowDescriptor& desc, 
RuntimeState* state) {
     _profile->add_info_string("Info", _format_status());
     if (_is_ignored) {
         return Status::OK();
     }
-    DCHECK((!_state->enable_pipeline_exec() && _rf_state == 
RuntimeFilterState::READY) ||
-           (_state->enable_pipeline_exec() &&
+    DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) 
||
+           (_enable_pipeline_exec &&
             _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY));
     DCHECK(is_consumer());
     std::lock_guard guard(_inner_mutex);
 
     if (_push_down_vexprs.empty()) {
-        RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state, 
_vprobe_ctx));
+        RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, 
_vprobe_ctx));
     }
-    // push expr
     vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), 
_push_down_vexprs.end());
     return Status::OK();
 }
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
+    auto execution_timeout = _state == nullptr ? 
_query_ctx->execution_timeout() * 1000
+                                               : _state->execution_timeout() * 
1000;
+    auto runtime_filter_wait_time_ms = _state == nullptr ? 
_query_ctx->runtime_filter_wait_time_ms()
+                                                         : 
_state->runtime_filter_wait_time_ms();
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
-                                    ? _state->execution_timeout() * 1000
-                                    : _state->runtime_filter_wait_time_ms();
-    if (_state->enable_pipeline_exec()) {
+                                    ? execution_timeout
+                                    : runtime_filter_wait_time_ms;
+    if (_enable_pipeline_exec) {
         auto expected = _rf_state_atomic.load(std::memory_order_acquire);
         if (expected == RuntimeFilterState::NOT_READY) {
             if (!_rf_state_atomic.compare_exchange_strong(
@@ -1203,12 +1239,16 @@ bool IRuntimeFilter::await() {
 bool IRuntimeFilter::is_ready_or_timeout() {
     DCHECK(is_consumer());
     auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    auto execution_timeout = _state == nullptr ? 
_query_ctx->execution_timeout() * 1000
+                                               : _state->execution_timeout() * 
1000;
+    auto runtime_filter_wait_time_ms = _state == nullptr ? 
_query_ctx->runtime_filter_wait_time_ms()
+                                                         : 
_state->runtime_filter_wait_time_ms();
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
-                                    ? _state->execution_timeout() * 1000
-                                    : _state->runtime_filter_wait_time_ms();
+                                    ? execution_timeout
+                                    : runtime_filter_wait_time_ms;
     int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-    if (!_state->enable_pipeline_exec()) {
+    if (!_enable_pipeline_exec) {
         _rf_state = RuntimeFilterState::TIME_OUT;
         return true;
     } else if (is_ready()) {
@@ -1245,7 +1285,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
-    if (_state->enable_pipeline_exec()) {
+    if (_enable_pipeline_exec) {
         _rf_state_atomic.store(RuntimeFilterState::READY);
     } else {
         std::unique_lock lock(_inner_mutex);
@@ -1261,6 +1301,10 @@ void IRuntimeFilter::signal() {
         _profile->add_info_string("BitmapSize", 
std::to_string(bitmap_filter->size()));
         _profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? 
"true" : "false");
     }
+    if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
+        _profile->add_info_string("BloomFilterSize",
+                                  
std::to_string(_wrapper->get_bloom_filter_size()));
+    }
 }
 
 BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
@@ -1268,8 +1312,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() 
const {
 }
 
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options,
-                                      UniqueId fragment_instance_id, int 
node_id,
-                                      bool build_bf_exactly) {
+                                      int node_id, bool build_bf_exactly) {
     // if node_id == -1 , it shouldn't be a consumer
     DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
 
@@ -1292,15 +1335,19 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     _has_remote_target = desc->has_remote_targets;
     _expr_order = desc->expr_order;
     _filter_id = desc->filter_id;
+    _opt_remote_rf = desc->__isset.opt_remote_rf && desc->opt_remote_rf;
     vectorized::VExprContext* build_ctx = nullptr;
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr, 
&build_ctx));
 
     RuntimeFilterParams params;
-    params.fragment_instance_id = fragment_instance_id;
     params.filter_id = _filter_id;
     params.filter_type = _runtime_filter_type;
     params.column_return_type = build_ctx->root()->type().type;
     params.max_in_num = options->runtime_filter_max_in_num;
+    // We build runtime filter by exact distinct count iff three conditions 
are met:
+    // 1. Only 1 join key
+    // 2. Do not have remote target (e.g. do not need to merge)
+    // 3. Bloom filter
     params.build_bf_exactly = build_bf_exactly && !_has_remote_target &&
                               _runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER;
     if (desc->__isset.bloom_filter_size_bytes) {
@@ -1334,7 +1381,11 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
         RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, 
iter->second, &_vprobe_ctx));
     }
 
-    _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
+    if (_state) {
+        _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, 
&params));
+    } else {
+        _wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, 
&params));
+    }
     return _wrapper->init(&params);
 }
 
@@ -1346,6 +1397,10 @@ Status IRuntimeFilter::serialize(PPublishFilterRequest* 
request, void** data, in
     return serialize_impl(request, data, len);
 }
 
+Status IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** 
data, int* len) {
+    return serialize_impl(request, data, len);
+}
+
 Status IRuntimeFilter::create_wrapper(RuntimeState* state, const 
MergeRuntimeFilterParams* param,
                                       ObjectPool* pool,
                                       
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
@@ -1358,6 +1413,35 @@ Status IRuntimeFilter::create_wrapper(RuntimeState* 
state, const UpdateRuntimeFi
     return _create_wrapper(state, param, pool, wrapper);
 }
 
+Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx,
+                                      const UpdateRuntimeFilterParamsV2* 
param, ObjectPool* pool,
+                                      
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+    int filter_type = param->request->filter_type();
+    PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
+    if (param->request->has_in_filter()) {
+        column_type = 
to_primitive_type(param->request->in_filter().column_type());
+    }
+    wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, 
get_type(filter_type),
+                                               param->request->filter_id()));
+
+    switch (filter_type) {
+    case PFilterType::IN_FILTER: {
+        DCHECK(param->request->has_in_filter());
+        return (*wrapper)->assign(&param->request->in_filter());
+    }
+    case PFilterType::BLOOM_FILTER: {
+        DCHECK(param->request->has_bloom_filter());
+        return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
+    }
+    case PFilterType::MINMAX_FILTER: {
+        DCHECK(param->request->has_minmax_filter());
+        return (*wrapper)->assign(&param->request->minmax_filter());
+    }
+    default:
+        return Status::InvalidArgument("unknown filter type");
+    }
+}
+
 void IRuntimeFilter::change_to_bloom_filter() {
     auto origin_type = _wrapper->get_real_type();
     _wrapper->change_to_bloom_filter();
@@ -1379,7 +1463,6 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* 
state, const T* param, Obje
         column_type = 
to_primitive_type(param->request->in_filter().column_type());
     }
     wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, 
get_type(filter_type),
-                                               
UniqueId(param->request->fragment_id()),
                                                param->request->filter_id()));
 
     switch (filter_type) {
@@ -1401,11 +1484,22 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* 
state, const T* param, Obje
 }
 
 void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
-    DCHECK(parent_profile != nullptr);
-    _profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, 
type = {})", _filter_id,
-                                                  
::doris::to_string(_runtime_filter_type))));
+    if (_profile_init) {
+        return;
+    }
+    {
+        std::lock_guard guard(_inner_mutex);
+        if (_profile_init) {
+            return;
+        }
+        DCHECK(parent_profile != nullptr);
+        _profile.reset(
+                new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type 
= {})", _filter_id,
+                                               
::doris::to_string(_runtime_filter_type))));
+        _profile_init = true;
+    }
     parent_profile->add_child(_profile.get(), true, nullptr);
-    if (!_state->enable_pipeline_exec()) {
+    if (!_enable_pipeline_exec) {
         _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
     }
     _profile->add_info_string("Info", _format_status());
@@ -1421,10 +1515,6 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
     }
 }
 
-void IRuntimeFilter::set_push_down_profile() {
-    _profile->add_info_string("HasPushDownToEngine", "true");
-}
-
 void IRuntimeFilter::ready_for_publish() {
     _wrapper->ready_for_publish();
 }
@@ -1446,10 +1536,6 @@ Status IRuntimeFilter::merge_from(const 
RuntimePredicateWrapper* wrapper) {
     return status;
 }
 
-const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() {
-    return _wrapper;
-}
-
 template <typename T>
 void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it,
                 void (*set_func)(PColumnValue*, const T*)) {
@@ -1752,21 +1838,46 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
     return Status::OK();
 }
 
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) 
{
+    int64_t start_update = MonotonicMillis();
+    if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
+        set_ignored();
+        const PInFilter in_filter = param->request->in_filter();
+        auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
+        set_ignored_msg(*msg);
+    }
+
+    std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
+    RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, 
&tmp_wrapper));
+    auto origin_type = _wrapper->get_real_type();
+    RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
+    if (origin_type != _wrapper->get_real_type()) {
+        update_runtime_filter_type_to_profile();
+    }
+    this->signal();
+
+    _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
+    _profile->add_info_string("UpdateTime",
+                              std::to_string(MonotonicMillis() - start_update) 
+ " ms");
+    return Status::OK();
+}
+
 Status IRuntimeFilter::consumer_close() {
     DCHECK(is_consumer());
     return Status::OK();
 }
 
 Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<vectorized::VExpr*>* 
container,
-                                                RuntimeState* state,
                                                 vectorized::VExprContext* 
vprob_expr) {
-    DCHECK(state != nullptr);
     DCHECK(container != nullptr);
     DCHECK(_pool != nullptr);
     DCHECK(vprob_expr->root()->type().type == _column_return_type ||
            (is_string_type(vprob_expr->root()->type().type) &&
             is_string_type(_column_return_type)) ||
-           _filter_type == RuntimeFilterType::BITMAP_FILTER);
+           _filter_type == RuntimeFilterType::BITMAP_FILTER)
+            << " vprob_expr->root()->type().type: " << 
vprob_expr->root()->type().type
+            << " _column_return_type: " << _column_return_type
+            << " _filter_type: " << ::doris::to_string(_filter_type);
 
     auto real_filter_type = get_real_type();
     switch (real_filter_type) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a2f6995d59..52191b5182 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -32,6 +32,7 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/lock.h"
@@ -51,6 +52,7 @@ namespace doris {
 class ObjectPool;
 class RuntimePredicateWrapper;
 class PPublishFilterRequest;
+class PPublishFilterRequestV2;
 class PMergeFilterRequest;
 class TRuntimeFilterDesc;
 class RowDescriptor;
@@ -106,7 +108,6 @@ struct RuntimeFilterParams {
               bloom_filter_size(-1),
               max_in_num(0),
               filter_id(0),
-              fragment_instance_id(0, 0),
               bitmap_filter_not_in(false) {}
 
     RuntimeFilterType filter_type;
@@ -115,7 +116,6 @@ struct RuntimeFilterParams {
     int64_t bloom_filter_size;
     int32_t max_in_num;
     int32_t filter_id;
-    UniqueId fragment_instance_id;
     bool bitmap_filter_not_in;
     bool build_bf_exactly;
 };
@@ -129,6 +129,16 @@ struct UpdateRuntimeFilterParams {
     ObjectPool* pool;
 };
 
+struct UpdateRuntimeFilterParamsV2 {
+    UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req,
+                                butil::IOBufAsZeroCopyInputStream* data_stream,
+                                ObjectPool* obj_pool)
+            : request(req), data(data_stream), pool(obj_pool) {}
+    const PPublishFilterRequestV2* request;
+    butil::IOBufAsZeroCopyInputStream* data;
+    ObjectPool* pool;
+};
+
 struct MergeRuntimeFilterParams {
     MergeRuntimeFilterParams(const PMergeFilterRequest* req,
                              butil::IOBufAsZeroCopyInputStream* data_stream)
@@ -164,7 +174,25 @@ public:
               _expr_order(-1),
               _always_true(false),
               _is_ignored(false),
-              registration_time_(MonotonicMillis()) {}
+              registration_time_(MonotonicMillis()),
+              _enable_pipeline_exec(_state->enable_pipeline_exec()) {}
+
+    IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool)
+            : _query_ctx(query_ctx),
+              _pool(pool),
+              _runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER),
+              _filter_id(-1),
+              _is_broadcast_join(true),
+              _has_remote_target(false),
+              _has_local_target(false),
+              _rf_state(RuntimeFilterState::NOT_READY),
+              _rf_state_atomic(RuntimeFilterState::NOT_READY),
+              _role(RuntimeFilterRole::PRODUCER),
+              _expr_order(-1),
+              _always_true(false),
+              _is_ignored(false),
+              registration_time_(MonotonicMillis()),
+              _enable_pipeline_exec(query_ctx->enable_pipeline_exec()) {}
 
     ~IRuntimeFilter() = default;
 
@@ -172,6 +200,10 @@ public:
                          const TQueryOptions* query_options, const 
RuntimeFilterRole role,
                          int node_id, IRuntimeFilter** res, bool 
build_bf_exactly = false);
 
+    static Status create(QueryContext* query_ctx, ObjectPool* pool, const 
TRuntimeFilterDesc* desc,
+                         const TQueryOptions* query_options, const 
RuntimeFilterRole role,
+                         int node_id, IRuntimeFilter** res, bool 
build_bf_exactly = false);
+
     void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& 
context);
     Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& 
context);
 
@@ -192,20 +224,19 @@ public:
     Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
 
     Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* 
push_vexprs,
-                               const RowDescriptor& desc);
+                               const RowDescriptor& desc, RuntimeState* state);
 
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
     bool has_remote_target() const { return _has_remote_target; }
 
     bool is_ready() const {
-        return (!_state->enable_pipeline_exec() && _rf_state == 
RuntimeFilterState::READY) ||
-               (_state->enable_pipeline_exec() &&
+        return (!_enable_pipeline_exec && _rf_state == 
RuntimeFilterState::READY) ||
+               (_enable_pipeline_exec &&
                 _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY);
     }
     RuntimeFilterState current_state() const {
-        return _state->enable_pipeline_exec() ? 
_rf_state_atomic.load(std::memory_order_acquire)
-                                              : _rf_state;
+        return _enable_pipeline_exec ? 
_rf_state_atomic.load(std::memory_order_acquire) : _rf_state;
     }
     bool is_ready_or_timeout();
 
@@ -226,33 +257,34 @@ public:
 
     // init filter with desc
     Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* 
options,
-                          UniqueId fragment_id, int node_id = -1, bool 
build_bf_exactly = false);
+                          int node_id = -1, bool build_bf_exactly = false);
 
     BloomFilterFuncBase* get_bloomfilter() const;
 
     // serialize _wrapper to protobuf
     Status serialize(PMergeFilterRequest* request, void** data, int* len);
     Status serialize(PPublishFilterRequest* request, void** data = nullptr, 
int* len = nullptr);
+    Status serialize(PPublishFilterRequestV2* request, void** data = nullptr, 
int* len = nullptr);
 
     Status merge_from(const RuntimePredicateWrapper* wrapper);
 
     // for ut
-    const RuntimePredicateWrapper* get_wrapper();
     static Status create_wrapper(RuntimeState* state, const 
MergeRuntimeFilterParams* param,
                                  ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
     static Status create_wrapper(RuntimeState* state, const 
UpdateRuntimeFilterParams* param,
                                  ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
+    static Status create_wrapper(QueryContext* query_ctx, const 
UpdateRuntimeFilterParamsV2* param,
+                                 ObjectPool* pool,
+                                 std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
     void change_to_bloom_filter();
     Status init_bloom_filter(const size_t build_bf_cardinality);
     Status update_filter(const UpdateRuntimeFilterParams* param);
+    Status update_filter(const UpdateRuntimeFilterParamsV2* param);
 
     void set_ignored() { _is_ignored = true; }
 
-    // for ut
-    bool is_ignored() const { return _is_ignored; }
-
     void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
 
     // for ut
@@ -262,21 +294,17 @@ public:
     Status consumer_close();
 
     // async push runtimefilter to remote node
-    Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
+    Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, 
bool opt_remote_rf);
     Status join_rpc();
 
     void init_profile(RuntimeProfile* parent_profile);
 
     void update_runtime_filter_type_to_profile();
 
-    void set_push_down_profile();
-
     void ready_for_publish();
 
-    std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const;
-
-    static bool enable_use_batch(int be_exec_version, PrimitiveType type) {
-        return be_exec_version > 0 && (is_int_or_bool(type) || 
is_float_or_double(type));
+    static bool enable_use_batch(bool use_batch, PrimitiveType type) {
+        return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
     }
 
     int filter_id() const { return _filter_id; }
@@ -304,7 +332,7 @@ protected:
     }
 
     std::string _get_explain_state_string() {
-        if (_state->enable_pipeline_exec()) {
+        if (_enable_pipeline_exec) {
             return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY
                            ? "READY"
                    : _rf_state_atomic.load(std::memory_order_acquire) ==
@@ -318,11 +346,12 @@ protected:
         }
     }
 
-    RuntimeState* _state;
+    RuntimeState* _state = nullptr;
+    QueryContext* _query_ctx = nullptr;
     ObjectPool* _pool;
     // _wrapper is a runtime filter function wrapper
     // _wrapper should alloc from _pool
-    RuntimePredicateWrapper* _wrapper = nullptr;
+    RuntimePredicateWrapper* _wrapper;
     // runtime filter type
     RuntimeFilterType _runtime_filter_type;
     // runtime filter id
@@ -350,7 +379,7 @@ protected:
     // this filter won't filter any data
     bool _always_true;
 
-    doris::vectorized::VExprContext* _vprobe_ctx;
+    doris::vectorized::VExprContext* _vprobe_ctx = nullptr;
 
     // Indicate whether runtime filter expr has been ignored
     bool _is_ignored;
@@ -370,6 +399,12 @@ protected:
 
     /// Time in ms (from MonotonicMillis()), that the filter was registered.
     const int64_t registration_time_;
+
+    const bool _enable_pipeline_exec;
+
+    bool _profile_init = false;
+
+    bool _opt_remote_rf;
 };
 
 // avoid expose RuntimePredicateWrapper
diff --git a/be/src/exprs/runtime_filter_rpc.cpp 
b/be/src/exprs/runtime_filter_rpc.cpp
index 8a5c4acb9b..829224f3f2 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -45,7 +45,8 @@ struct IRuntimeFilter::rpc_context {
     brpc::CallId cid;
 };
 
-Status IRuntimeFilter::push_to_remote(RuntimeState* state, const 
TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const 
TNetworkAddress* addr,
+                                      bool opt_remote_rf) {
     DCHECK(is_producer());
     DCHECK(_rpc_context == nullptr);
     std::shared_ptr<PBackendService_Stub> stub(
@@ -69,6 +70,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, 
const TNetworkAddress
     pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
 
     _rpc_context->request.set_filter_id(_filter_id);
+    _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
     _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
     _rpc_context->cntl.set_timeout_ms(1000);
     _rpc_context->cid = _rpc_context->cntl.call_id();
diff --git a/be/src/olap/bloom_filter_predicate.h 
b/be/src/olap/bloom_filter_predicate.h
index 9090ea86a3..73c2e9fba1 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -89,8 +89,7 @@ private:
                     }
                 }
             }
-
-        } else if (IRuntimeFilter::enable_use_batch(_be_exec_version, T)) {
+        } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) {
             const auto& data =
                     reinterpret_cast<
                             const 
vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7a7ec74835..e2a8802fd9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -18,6 +18,7 @@
 #include "runtime/fragment_mgr.h"
 
 #include <bvar/latency_recorder.h>
+#include <exprs/runtime_filter.h>
 #include <fmt/format.h>
 #include <gen_cpp/DorisExternalService_types.h>
 #include <gen_cpp/FrontendService.h>
@@ -640,7 +641,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
     } else {
         // This may be a first fragment request of the query.
         // Create the query fragments context.
-        query_ctx = QueryContext::create_shared(params.fragment_num_on_host, 
_exec_env);
+        query_ctx = QueryContext::create_shared(params.fragment_num_on_host, 
_exec_env,
+                                                params.query_options);
         query_ctx->query_id = query_id;
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
@@ -1155,10 +1157,65 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
     return runtime_filter_mgr->update_filter(request, attach_data);
 }
 
+Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
+                                   butil::IOBufAsZeroCopyInputStream* 
attach_data) {
+    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
+    const auto& fragment_instance_ids = request->fragment_instance_ids();
+    if (fragment_instance_ids.size() > 0) {
+        UniqueId fragment_instance_id = fragment_instance_ids[0];
+        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
+        std::shared_ptr<FragmentExecState> fragment_state;
+        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+        ObjectPool* pool;
+        if (is_pipeline) {
+            std::unique_lock<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(tfragment_instance_id);
+            if (iter == _pipeline_map.end()) {
+                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
+                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
+            }
+            pip_context = iter->second;
+
+            DCHECK(pip_context != nullptr);
+            runtime_filter_mgr =
+                    
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
+            pool = &pip_context->get_query_context()->obj_pool;
+        } else {
+            std::unique_lock<std::mutex> lock(_lock);
+            auto iter = _fragment_map.find(tfragment_instance_id);
+            if (iter == _fragment_map.end()) {
+                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
+                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
+            }
+            fragment_state = iter->second;
+
+            DCHECK(fragment_state != nullptr);
+            runtime_filter_mgr = fragment_state->executor()
+                                         ->runtime_state()
+                                         ->get_query_ctx()
+                                         ->runtime_filter_mgr();
+            pool = &fragment_state->get_query_ctx()->obj_pool;
+        }
+
+        UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
+        int filter_id = request->filter_id();
+        IRuntimeFilter* real_filter = nullptr;
+        RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, 
&real_filter));
+        RETURN_IF_ERROR(real_filter->update_filter(&params));
+    }
+
+    return Status::OK();
+}
+
 Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* 
attach_data) {
     UniqueId queryid = request->query_id();
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+    bool opt_remote_rf = request->has_opt_remote_rf() && 
request->opt_remote_rf();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
 
@@ -1189,7 +1246,7 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
         // when filter_controller->merge is still in progress
         fragment_state = iter->second;
     }
-    RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
+    RETURN_IF_ERROR(filter_controller->merge(request, attach_data, 
opt_remote_rf));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index ad7f830a7f..8ca58ccffa 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -129,6 +129,9 @@ public:
     Status apply_filter(const PPublishFilterRequest* request,
                         butil::IOBufAsZeroCopyInputStream* attach_data);
 
+    Status apply_filterv2(const PPublishFilterRequestV2* request,
+                          butil::IOBufAsZeroCopyInputStream* attach_data);
+
     Status merge_filter(const PMergeFilterRequest* request,
                         butil::IOBufAsZeroCopyInputStream* attach_data);
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b941f0fe86..0909702c70 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -30,6 +30,7 @@
 #include "runtime/datetime_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
 #include "task_group/task_group.h"
 #include "util/pretty_printer.h"
@@ -48,8 +49,12 @@ class QueryContext {
     ENABLE_FACTORY_CREATOR(QueryContext);
 
 public:
-    QueryContext(int total_fragment_num, ExecEnv* exec_env)
-            : fragment_num(total_fragment_num), timeout_second(-1), 
_exec_env(exec_env) {
+    QueryContext(int total_fragment_num, ExecEnv* exec_env, const 
TQueryOptions& query_options)
+            : fragment_num(total_fragment_num),
+              timeout_second(-1),
+              _exec_env(exec_env),
+              _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
+              _query_options(query_options) {
         _start_time = vectorized::VecDateTimeValue::local_time();
         _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
         _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
@@ -139,6 +144,29 @@ public:
 
     taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
 
+    int execution_timeout() const {
+        return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
+                                                        : 
_query_options.query_timeout;
+    }
+
+    int32_t runtime_filter_wait_time_ms() const {
+        return _query_options.runtime_filter_wait_time_ms;
+    }
+
+    bool enable_pipeline_exec() const {
+        return _query_options.__isset.enable_pipeline_engine &&
+               _query_options.enable_pipeline_engine;
+    }
+
+    int be_exec_version() const {
+        if (!_query_options.__isset.be_exec_version) {
+            return 0;
+        }
+        return _query_options.be_exec_version;
+    }
+
+    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
+
 public:
     TUniqueId query_id;
     DescriptorTbl* desc_tbl;
@@ -186,6 +214,8 @@ private:
     vectorized::RuntimePredicate _runtime_predicate;
 
     taskgroup::TaskGroupPtr _task_group;
+    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
+    const TQueryOptions _query_options;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index e956122130..b6dfa0a625 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -35,6 +35,7 @@
 #include "exprs/runtime_filter.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/brpc_client_cache.h"
@@ -51,10 +52,12 @@ struct async_rpc_context {
 
 RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* 
state) : _state(state) {}
 
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* 
query_ctx)
+        : _query_ctx(query_ctx) {}
+
 RuntimeFilterMgr::~RuntimeFilterMgr() {}
 
 Status RuntimeFilterMgr::init() {
-    DCHECK(_state->query_mem_tracker() != nullptr);
     _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
                                             
ExecEnv::GetInstance()->experimental_mem_tracker());
     return Status::OK();
@@ -107,20 +110,41 @@ Status RuntimeFilterMgr::register_filter(const 
RuntimeFilterRole role,
     VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role;
 
     auto iter = filter_map->find(key);
-    if (iter != filter_map->end()) {
-        return Status::InvalidArgument("filter has registed");
-    }
 
     RuntimeFilterMgrVal filter_mgr_val;
     filter_mgr_val.role = role;
 
-    RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, 
role, node_id,
-                                           &filter_mgr_val.filter, 
build_bf_exactly));
-
-    filter_map->emplace(key, filter_mgr_val);
+    if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == 
RuntimeFilterRole::CONSUMER &&
+        desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) {
+        // if this runtime filter has remote target (e.g. need merge), we 
reuse the runtime filter between all instances
+        DCHECK(_query_ctx != nullptr);
+        if (iter != filter_map->end()) {
+            return Status::OK();
+        }
+        {
+            std::lock_guard<std::mutex> l(_lock);
+            iter = filter_map->find(key);
+            if (iter != filter_map->end()) {
+                return Status::OK();
+            }
+            RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, 
&_query_ctx->obj_pool, &desc,
+                                                   &options, role, node_id, 
&filter_mgr_val.filter,
+                                                   build_bf_exactly));
+            filter_map->emplace(key, filter_mgr_val);
+        }
+    } else {
+        DCHECK(_state != nullptr);
+        if (iter != filter_map->end()) {
+            return Status::InvalidArgument("filter has registed");
+        }
+        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, 
&options, role, node_id,
+                                               &filter_mgr_val.filter, 
build_bf_exactly));
+        filter_map->emplace(key, filter_mgr_val);
+    }
 
     return Status::OK();
 }
+
 Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
                                        butil::IOBufAsZeroCopyInputStream* 
data) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
@@ -158,12 +182,34 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
     cntVal->runtime_filter_desc = *runtime_filter_desc;
     cntVal->target_info = *target_info;
     cntVal->pool.reset(new ObjectPool());
-    cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, 
cntVal->pool.get()));
+    cntVal->filter =
+            cntVal->pool->add(new IRuntimeFilter(_state, 
&_state->get_query_ctx()->obj_pool));
 
     std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
     // LOG(INFO) << "entity filter id:" << filter_id;
-    cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options,
-                                   _fragment_instance_id, -1, false);
+    cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options, -1, false);
+    _filter_map.emplace(filter_id, cntVal);
+    return Status::OK();
+}
+
+Status RuntimeFilterMergeControllerEntity::_init_with_desc(
+        const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* 
query_options,
+        const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
+        const int producer_size) {
+    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::shared_ptr<RuntimeFilterCntlVal> cntVal = 
std::make_shared<RuntimeFilterCntlVal>();
+    // runtime_filter_desc and target will be released,
+    // so we need to copy to cntVal
+    cntVal->producer_size = producer_size;
+    cntVal->runtime_filter_desc = *runtime_filter_desc;
+    cntVal->targetv2_info = *targetv2_info;
+    cntVal->pool.reset(new ObjectPool());
+    cntVal->filter =
+            cntVal->pool->add(new IRuntimeFilter(_state, 
&_state->get_query_ctx()->obj_pool));
+
+    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+    // LOG(INFO) << "entity filter id:" << filter_id;
+    cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options);
     _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
@@ -179,26 +225,60 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
     for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) 
{
         int filter_id = filterid_to_desc.first;
         const auto& target_iter = 
runtime_filter_params.rid_to_target_param.find(filter_id);
-        if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+        if (target_iter == runtime_filter_params.rid_to_target_param.end() &&
+            !runtime_filter_params.__isset.rid_to_target_paramv2) {
             return Status::InternalError("runtime filter params meet error");
+        } else if (target_iter == 
runtime_filter_params.rid_to_target_param.end()) {
+            const auto& targetv2_iter = 
runtime_filter_params.rid_to_target_paramv2.find(filter_id);
+            if (targetv2_iter == 
runtime_filter_params.rid_to_target_paramv2.end()) {
+                return Status::InternalError("runtime filter params meet 
error");
+            }
+            const auto& build_iter =
+                    
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+            if (build_iter == 
runtime_filter_params.runtime_filter_builder_num.end()) {
+                return Status::InternalError("runtime filter params meet 
error");
+            }
+            _init_with_desc(&filterid_to_desc.second, &query_options, 
&targetv2_iter->second,
+                            build_iter->second);
+        } else {
+            const auto& build_iter =
+                    
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+            if (build_iter == 
runtime_filter_params.runtime_filter_builder_num.end()) {
+                return Status::InternalError("runtime filter params meet 
error");
+            }
+            _init_with_desc(&filterid_to_desc.second, &query_options, 
&target_iter->second,
+                            build_iter->second);
         }
-        const auto& build_iter = 
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
-        if (build_iter == 
runtime_filter_params.runtime_filter_builder_num.end()) {
-            return Status::InternalError("runtime filter params meet error");
+    }
+    if (runtime_filter_params.__isset.rid_to_runtime_filter) {
+        for (auto& filterid_to_desc : 
runtime_filter_params.rid_to_runtime_filter) {
+            int filter_id = filterid_to_desc.first;
+            const auto& target_iter = 
runtime_filter_params.rid_to_target_param.find(filter_id);
+            if (target_iter == 
runtime_filter_params.rid_to_target_param.end()) {
+                return Status::InternalError("runtime filter params meet 
error");
+            }
+            const auto& build_iter =
+                    
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+            if (build_iter == 
runtime_filter_params.runtime_filter_builder_num.end()) {
+                return Status::InternalError("runtime filter params meet 
error");
+            }
+            _init_with_desc(&filterid_to_desc.second, &query_options, 
&target_iter->second,
+                            build_iter->second);
         }
-        _init_with_desc(&filterid_to_desc.second, &query_options, 
&target_iter->second,
-                        build_iter->second);
     }
     return Status::OK();
 }
 
 // merge data
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* 
request,
-                                                 
butil::IOBufAsZeroCopyInputStream* attach_data) {
+                                                 
butil::IOBufAsZeroCopyInputStream* attach_data,
+                                                 bool opt_remote_rf) {
+    _opt_remote_rf = _opt_remote_rf && opt_remote_rf;
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
     {
+        int64_t start_merge = MonotonicMillis();
         std::lock_guard<std::mutex> guard(_filter_map_mutex);
         auto iter = _filter_map.find(std::to_string(request->filter_id()));
         VLOG_ROW << "recv filter id:" << request->filter_id() << " " << 
request->ShortDebugString();
@@ -220,70 +300,143 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
         // TODO: avoid log when we had acquired a lock
         VLOG_ROW << "merge size:" << merged_size << ":" << 
cntVal->producer_size;
         DCHECK_LE(merged_size, cntVal->producer_size);
+        _merge_timer += (MonotonicMillis() - start_merge);
         if (merged_size < cntVal->producer_size) {
             return Status::OK();
         }
     }
 
     if (merged_size == cntVal->producer_size) {
-        // prepare rpc context
-        using PPublishFilterRpcContext =
-                async_rpc_context<PPublishFilterRequest, 
PPublishFilterResponse>;
-        std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
-        rpc_contexts.reserve(cntVal->target_info.size());
-
-        butil::IOBuf request_attachment;
-
-        PPublishFilterRequest apply_request;
-        // serialize filter
-        void* data = nullptr;
-        int len = 0;
-        bool has_attachment = false;
-        RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, 
&len));
-        if (data != nullptr && len > 0) {
-            request_attachment.append(data, len);
-            has_attachment = true;
-        }
+        if (opt_remote_rf) {
+            DCHECK_GT(cntVal->targetv2_info.size(), 0);
+            DCHECK(cntVal->filter->is_bloomfilter());
+            // Optimize merging phase iff:
+            // 1. All BE has been upgraded (e.g. _opt_remote_rf)
+            // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
+            // 3. This filter is bloom filter (only bloom filter should be 
used for merging)
+            using PPublishFilterRpcContext =
+                    async_rpc_context<PPublishFilterRequestV2, 
PPublishFilterResponse>;
+            std::vector<std::unique_ptr<PPublishFilterRpcContext>> 
rpc_contexts;
+            rpc_contexts.reserve(cntVal->targetv2_info.size());
+
+            butil::IOBuf request_attachment;
+
+            PPublishFilterRequestV2 apply_request;
+            // serialize filter
+            void* data = nullptr;
+            int len = 0;
+            bool has_attachment = false;
+            RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, 
&len));
+            if (data != nullptr && len > 0) {
+                request_attachment.append(data, len);
+                has_attachment = true;
+            }
 
-        std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
-        for (size_t i = 0; i < targets.size(); i++) {
-            rpc_contexts.emplace_back(new PPublishFilterRpcContext);
-            size_t cur = rpc_contexts.size() - 1;
-            rpc_contexts[cur]->request = apply_request;
-            rpc_contexts[cur]->request.set_filter_id(request->filter_id());
-            
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
-                                                       request->is_pipeline());
-            *rpc_contexts[cur]->request.mutable_query_id() = 
request->query_id();
-            if (has_attachment) {
-                
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+            std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cntVal->targetv2_info;
+            for (size_t i = 0; i < targets.size(); i++) {
+                rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+                size_t cur = rpc_contexts.size() - 1;
+                rpc_contexts[cur]->request = apply_request;
+                rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+                
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+                                                           
request->is_pipeline());
+                rpc_contexts[cur]->request.set_merge_time(_merge_timer);
+                *rpc_contexts[cur]->request.mutable_query_id() = 
request->query_id();
+                if (has_attachment) {
+                    
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+                }
+                rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
+
+                // set fragment-id
+                for (size_t fid = 0; fid < 
targets[cur].target_fragment_instance_ids.size();
+                     fid++) {
+                    PUniqueId* cur_id = 
rpc_contexts[cur]->request.add_fragment_instance_ids();
+                    
cur_id->set_hi(targets[cur].target_fragment_instance_ids[fid].hi);
+                    
cur_id->set_lo(targets[cur].target_fragment_instance_ids[fid].lo);
+                }
+
+                std::shared_ptr<PBackendService_Stub> stub(
+                        
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                                targets[i].target_fragment_instance_addr));
+                VLOG_NOTICE << "send filter " << 
rpc_contexts[cur]->request.filter_id()
+                            << " to:" << 
targets[i].target_fragment_instance_addr.hostname << ":"
+                            << targets[i].target_fragment_instance_addr.port
+                            << rpc_contexts[cur]->request.ShortDebugString();
+                if (stub == nullptr) {
+                    rpc_contexts.pop_back();
+                    continue;
+                }
+                stub->apply_filterv2(&rpc_contexts[cur]->cntl, 
&rpc_contexts[cur]->request,
+                                     &rpc_contexts[cur]->response, 
brpc::DoNothing());
             }
-            rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
-
-            // set fragment-id
-            auto request_fragment_id = 
rpc_contexts[cur]->request.mutable_fragment_id();
-            
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
-            
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
-
-            std::shared_ptr<PBackendService_Stub> stub(
-                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
-                            targets[i].target_fragment_instance_addr));
-            VLOG_NOTICE << "send filter " << 
rpc_contexts[cur]->request.filter_id()
-                        << " to:" << 
targets[i].target_fragment_instance_addr.hostname << ":"
-                        << targets[i].target_fragment_instance_addr.port
-                        << rpc_contexts[cur]->request.ShortDebugString();
-            if (stub == nullptr) {
-                rpc_contexts.pop_back();
-                continue;
+            for (auto& rpc_context : rpc_contexts) {
+                brpc::Join(rpc_context->cid);
+                if (rpc_context->cntl.Failed()) {
+                    LOG(WARNING) << "runtimefilter rpc err:" << 
rpc_context->cntl.ErrorText();
+                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                            rpc_context->cntl.remote_side());
+                }
             }
-            stub->apply_filter(&rpc_contexts[cur]->cntl, 
&rpc_contexts[cur]->request,
-                               &rpc_contexts[cur]->response, 
brpc::DoNothing());
-        }
-        for (auto& rpc_context : rpc_contexts) {
-            brpc::Join(rpc_context->cid);
-            if (rpc_context->cntl.Failed()) {
-                LOG(WARNING) << "runtimefilter rpc err:" << 
rpc_context->cntl.ErrorText();
-                ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
-                        rpc_context->cntl.remote_side());
+        } else {
+            // prepare rpc context
+            using PPublishFilterRpcContext =
+                    async_rpc_context<PPublishFilterRequest, 
PPublishFilterResponse>;
+            std::vector<std::unique_ptr<PPublishFilterRpcContext>> 
rpc_contexts;
+            rpc_contexts.reserve(cntVal->target_info.size());
+
+            butil::IOBuf request_attachment;
+
+            PPublishFilterRequest apply_request;
+            // serialize filter
+            void* data = nullptr;
+            int len = 0;
+            bool has_attachment = false;
+            RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, 
&len));
+            if (data != nullptr && len > 0) {
+                request_attachment.append(data, len);
+                has_attachment = true;
+            }
+
+            std::vector<TRuntimeFilterTargetParams>& targets = 
cntVal->target_info;
+            for (size_t i = 0; i < targets.size(); i++) {
+                rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+                size_t cur = rpc_contexts.size() - 1;
+                rpc_contexts[cur]->request = apply_request;
+                rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+                
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+                                                           
request->is_pipeline());
+                *rpc_contexts[cur]->request.mutable_query_id() = 
request->query_id();
+                if (has_attachment) {
+                    
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+                }
+                rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
+
+                // set fragment-id
+                auto request_fragment_id = 
rpc_contexts[cur]->request.mutable_fragment_id();
+                
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
+                
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
+
+                std::shared_ptr<PBackendService_Stub> stub(
+                        
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                                targets[i].target_fragment_instance_addr));
+                VLOG_NOTICE << "send filter " << 
rpc_contexts[cur]->request.filter_id()
+                            << " to:" << 
targets[i].target_fragment_instance_addr.hostname << ":"
+                            << targets[i].target_fragment_instance_addr.port
+                            << rpc_contexts[cur]->request.ShortDebugString();
+                if (stub == nullptr) {
+                    rpc_contexts.pop_back();
+                    continue;
+                }
+                stub->apply_filter(&rpc_contexts[cur]->cntl, 
&rpc_contexts[cur]->request,
+                                   &rpc_contexts[cur]->response, 
brpc::DoNothing());
+            }
+            for (auto& rpc_context : rpc_contexts) {
+                brpc::Join(rpc_context->cid);
+                if (rpc_context->cntl.Failed()) {
+                    LOG(WARNING) << "runtimefilter rpc err:" << 
rpc_context->cntl.ErrorText();
+                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                            rpc_context->cntl.remote_side());
+                }
             }
         }
     }
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 09a760eff8..0a0a70ec7a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -41,11 +41,14 @@ class IOBufAsZeroCopyInputStream;
 
 namespace doris {
 class PPublishFilterRequest;
+class PPublishFilterRequestV2;
 class PMergeFilterRequest;
 class IRuntimeFilter;
 class MemTracker;
 class RuntimeState;
 enum class RuntimeFilterRole;
+class RuntimePredicateWrapper;
+class QueryContext;
 
 /// producer:
 /// Filter filter;
@@ -63,6 +66,8 @@ class RuntimeFilterMgr {
 public:
     RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
 
+    RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
+
     ~RuntimeFilterMgr();
 
     Status init();
@@ -100,12 +105,14 @@ private:
     std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
 
     RuntimeState* _state;
+    QueryContext* _query_ctx;
     std::unique_ptr<MemTracker> _tracker;
     ObjectPool _pool;
 
     TNetworkAddress _merge_addr;
 
     bool _has_merge_addr;
+    std::mutex _lock;
 };
 
 // controller -> <query-id, entity>
@@ -123,8 +130,8 @@ public:
                 const TQueryOptions& query_options);
 
     // handle merge rpc
-    Status merge(const PMergeFilterRequest* request,
-                 butil::IOBufAsZeroCopyInputStream* attach_data);
+    Status merge(const PMergeFilterRequest* request, 
butil::IOBufAsZeroCopyInputStream* attach_data,
+                 bool opt_remote_rf);
 
     UniqueId query_id() const { return _query_id; }
 
@@ -135,6 +142,7 @@ public:
         int producer_size;
         TRuntimeFilterDesc runtime_filter_desc;
         std::vector<doris::TRuntimeFilterTargetParams> target_info;
+        std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
         IRuntimeFilter* filter;
         std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
         std::shared_ptr<ObjectPool> pool;
@@ -149,6 +157,11 @@ private:
                            const 
std::vector<doris::TRuntimeFilterTargetParams>* target_info,
                            const int producer_size);
 
+    Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+                           const TQueryOptions* query_options,
+                           const 
std::vector<doris::TRuntimeFilterTargetParamsV2>* target_info,
+                           const int producer_size);
+
     UniqueId _query_id;
     UniqueId _fragment_instance_id;
     // protect _filter_map
@@ -158,6 +171,8 @@ private:
     // filter-id -> val
     std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
     RuntimeState* _state;
+    bool _opt_remote_rf = true;
+    int64_t _merge_timer = 0;
 };
 
 // RuntimeFilterMergeController has a map query-id -> entity
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 512438c776..0dbc26d6ac 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -829,6 +829,30 @@ void 
PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
     }
 }
 
+void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* 
controller,
+                                          const 
::doris::PPublishFilterRequestV2* request,
+                                          ::doris::PPublishFilterResponse* 
response,
+                                          ::google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filterv2 recv";
+        Status st = _exec_env->fragment_mgr()->apply_filterv2(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
+}
+
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 6cfbae4fa7..22042196e2 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -123,6 +123,10 @@ public:
                       const ::doris::PPublishFilterRequest* request,
                       ::doris::PPublishFilterResponse* response,
                       ::google::protobuf::Closure* done) override;
+    void apply_filterv2(::google::protobuf::RpcController* controller,
+                        const ::doris::PPublishFilterRequestV2* request,
+                        ::doris::PPublishFilterResponse* response,
+                        ::google::protobuf::Closure* done) override;
     void transmit_block(::google::protobuf::RpcController* controller,
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index f811775d69..67ad6d29c1 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -428,6 +428,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
             ADD_TIMER(_build_phase_profile, 
"BuildTableConvertToPartitionedTime");
     _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", 
TUnit::UNIT);
     _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, 
"BuildSideHashComputingTime");
+    _build_runtime_filter_timer = ADD_TIMER(_build_phase_profile, 
"BuildRuntimeFilterTime");
 
     // Probe phase
     auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", 
true, true);
@@ -441,7 +442,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
 
     _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
 
-    _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
+    _push_down_timer = ADD_TIMER(runtime_profile(), 
"PublishRuntimeFilterTime");
     _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
     _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", 
TUnit::UNIT);
     _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), 
"FilledBuckets", TUnit::UNIT);
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index eb5bf7d240..eef8b30b90 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -288,6 +288,7 @@ private:
     RuntimeProfile::Counter* _probe_side_output_timer;
     RuntimeProfile::Counter* _build_side_compute_hash_timer;
     RuntimeProfile::Counter* _build_side_merge_block_timer;
+    RuntimeProfile::Counter* _build_runtime_filter_timer;
 
     RuntimeProfile::Counter* _build_blocks_memory_usage;
     RuntimeProfile::Counter* _hash_table_memory_usage;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 6ff46427ed..d13d4fc9c8 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -316,10 +316,23 @@ Status VScanNode::_register_runtime_filter() {
     for (int i = 0; i < filter_size; ++i) {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
-        RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
-                RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id()));
-        
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-                                                                         
&runtime_filter));
+        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+            DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && 
filter_desc.has_remote_targets);
+            // Optimize merging phase iff:
+            // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
+            // 2. This filter is bloom filter (only bloom filter should be 
used for merging)
+            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
+                    RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id(),
+                    false));
+            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+                    filter_desc.filter_id, &runtime_filter));
+        } else {
+            RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
+                    RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id(),
+                    false));
+            
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+                                                                             
&runtime_filter));
+        }
         _runtime_filter_ctxs.emplace_back(runtime_filter);
         _runtime_filter_ready_flag.emplace_back(false);
     }
@@ -345,13 +358,6 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
     std::vector<VExpr*> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        // If all targets are local, scan node will use hash node's runtime 
filter, and we don't
-        // need to allocate memory again
-        if (runtime_filter->has_remote_target()) {
-            if (auto bf = runtime_filter->get_bloomfilter()) {
-                RETURN_IF_ERROR(bf->init_with_fixed_length());
-            }
-        }
         bool ready = runtime_filter->is_ready();
         if (!ready && wait) {
             ready = runtime_filter->await();
@@ -1329,7 +1335,8 @@ Status 
VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
             ++current_arrived_rf_num;
             continue;
         } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-            
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, 
_row_descriptor);
+            
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, 
_row_descriptor,
+                                                                        
_state);
             ++current_arrived_rf_num;
             _runtime_filter_ctxs[i].apply_mark = true;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 58d0086ed6..fe6180eff1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -190,8 +190,12 @@ public final class RuntimeFilter {
         tFilter.setIsBroadcastJoin(isBroadcastJoin);
         tFilter.setHasLocalTargets(hasLocalTargets);
         tFilter.setHasRemoteTargets(hasRemoteTargets);
+        boolean optRemoteRf = true;
         for (RuntimeFilterTarget target : targets) {
             tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), 
target.expr.treeToThrift());
+            // TODO: now only support SlotRef
+            optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType 
== TRuntimeFilterType.BLOOM
+                    && target.expr instanceof SlotRef;
         }
         tFilter.setType(runtimeFilterType);
         tFilter.setBloomFilterSizeBytes(filterSizeBytes);
@@ -199,6 +203,7 @@ public final class RuntimeFilter {
             tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
             tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
         }
+        tFilter.setOptRemoteRf(optRemoteRf);
         return tFilter;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e60e4122b2..8068c1a998 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -96,7 +96,7 @@ import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TResourceLimit;
 import org.apache.doris.thrift.TRuntimeFilterParams;
-import org.apache.doris.thrift.TRuntimeFilterTargetParams;
+import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -3122,12 +3122,24 @@ public class Coordinator {
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
                     for (Map.Entry<RuntimeFilterId, 
List<FRuntimeFilterTargetParam>> entry
                             : ridToTargetParam.entrySet()) {
-                        List<TRuntimeFilterTargetParams> targetParams = 
Lists.newArrayList();
+                        Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> 
targetParams = new HashMap<>();
                         for (FRuntimeFilterTargetParam targetParam : 
entry.getValue()) {
-                            targetParams.add(new 
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
-                                    targetParam.targetFragmentInstanceAddr));
+                            if 
(targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        
.add(targetParam.targetFragmentInstanceId);
+                            } else {
+                                
targetParams.put(targetParam.targetFragmentInstanceAddr,
+                                        new TRuntimeFilterTargetParamsV2());
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
+                                        = 
targetParam.targetFragmentInstanceAddr;
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        = new ArrayList<>();
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        
.add(targetParam.targetFragmentInstanceId);
+                            }
                         }
-                        
params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(),
 targetParams);
+                        
params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
+                                new 
ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
                     }
                     for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
                         
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
@@ -3197,12 +3209,25 @@ public class Coordinator {
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
                     for (Map.Entry<RuntimeFilterId, 
List<FRuntimeFilterTargetParam>> entry
                             : ridToTargetParam.entrySet()) {
-                        List<TRuntimeFilterTargetParams> targetParams = 
Lists.newArrayList();
+                        Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> 
targetParams = new HashMap<>();
                         for (FRuntimeFilterTargetParam targetParam : 
entry.getValue()) {
-                            targetParams.add(new 
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
-                                    targetParam.targetFragmentInstanceAddr));
+                            if 
(targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        
.add(targetParam.targetFragmentInstanceId);
+                            } else {
+                                
targetParams.put(targetParam.targetFragmentInstanceAddr,
+                                        new TRuntimeFilterTargetParamsV2());
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
+                                        = 
targetParam.targetFragmentInstanceAddr;
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        = new ArrayList<>();
+                                
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+                                        
.add(targetParam.targetFragmentInstanceId);
+                            }
                         }
-                        
localParams.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), 
targetParams);
+
+                        
localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
+                                new 
ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
                     }
                     for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
                         
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index d41a670052..f66090a77b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -481,6 +481,7 @@ message PMergeFilterRequest {
     optional PBloomFilter bloom_filter = 6;
     optional PInFilter in_filter = 7;
     optional bool is_pipeline = 8;
+    optional bool opt_remote_rf = 9;
 };
 
 message PMergeFilterResponse {
@@ -498,6 +499,18 @@ message PPublishFilterRequest {
     optional bool is_pipeline = 8;
 };
 
+message PPublishFilterRequestV2 {
+    required int32 filter_id = 1;
+    required PUniqueId query_id = 2;
+    repeated PUniqueId fragment_instance_ids = 3;
+    required PFilterType filter_type = 4;
+    optional PMinMaxFilter minmax_filter = 5;
+    optional PBloomFilter bloom_filter = 6;
+    optional PInFilter in_filter = 7;
+    optional bool is_pipeline = 8;
+    optional int64 merge_time = 9;
+};
+
 message PPublishFilterResponse {
     required PStatus status = 1;
 };
@@ -639,6 +652,7 @@ service PBackendService {
     rpc rollback(PRollbackRequest) returns (PRollbackResult);
     rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
+    rpc apply_filterv2(PPublishFilterRequestV2) returns 
(PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 111718eb1b..aa7ffc24ae 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -237,6 +237,12 @@ struct TRuntimeFilterTargetParams {
   2: required Types.TNetworkAddress target_fragment_instance_addr
 }
 
+struct TRuntimeFilterTargetParamsV2 {
+  1: required list<Types.TUniqueId> target_fragment_instance_ids
+  // The address of the instance where the fragment is expected to run
+  2: required Types.TNetworkAddress target_fragment_instance_addr
+}
+
 struct TRuntimeFilterParams {
   // Runtime filter merge instance address
   1: optional Types.TNetworkAddress runtime_filter_merge_addr
@@ -250,6 +256,8 @@ struct TRuntimeFilterParams {
 
   // Number of Runtime filter producers
   4: optional map<i32, i32> runtime_filter_builder_num
+
+  5: optional map<i32, list<TRuntimeFilterTargetParamsV2>> 
rid_to_target_paramv2
 }
 
 // Parameters for a single execution instance of a particular TPlanFragment
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0eaf1d61a0..3b332744a9 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1001,6 +1001,8 @@ struct TRuntimeFilterDesc {
 
   // for bitmap filter
   11: optional bool bitmap_filter_not_in
+
+  12: optional bool opt_remote_rf;
 }
 
 struct TDataGenScanNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to