This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 90f9ca817aa [Refactor](opt) Opt rf and remove unless code (#30900)
90f9ca817aa is described below

commit 90f9ca817aab540df277c55432b46ad96808c034
Author: HappenLee <[email protected]>
AuthorDate: Fri Feb 16 20:32:37 2024 +0800

    [Refactor](opt) Opt rf and remove unless code (#30900)
    
    Opt rf and remove unless code
---
 be/src/exprs/runtime_filter.cpp                    | 62 +++++++++++-----------
 be/src/exprs/runtime_filter.h                      | 18 +++----
 be/src/exprs/runtime_filter_slots.h                | 60 ++++++++++-----------
 be/src/exprs/runtime_filter_slots_cross.h          | 13 ++---
 be/src/olap/push_handler.cpp                       |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 23 +++-----
 be/src/pipeline/exec/hashjoin_build_sink.h         |  3 --
 be/src/pipeline/exec/join_build_sink_operator.cpp  |  3 +-
 be/src/pipeline/exec/join_build_sink_operator.h    |  5 ++
 .../exec/nested_loop_join_build_operator.cpp       |  8 +--
 .../exec/nested_loop_join_build_operator.h         |  2 -
 be/src/pipeline/pipeline_fragment_context.cpp      |  3 +-
 be/src/pipeline/pipeline_fragment_context.h        |  4 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  6 +--
 be/src/runtime/fold_constant_executor.cpp          |  2 +-
 be/src/runtime/fragment_mgr.cpp                    | 35 +++++++-----
 be/src/runtime/plan_fragment_executor.cpp          |  5 +-
 be/src/runtime/runtime_filter_mgr.cpp              | 29 +++-------
 be/src/runtime/runtime_filter_mgr.h                |  8 +--
 be/src/runtime/runtime_state.cpp                   | 20 ++++---
 be/src/runtime/runtime_state.h                     | 10 ++--
 be/src/vec/exec/join/vhash_join_node.cpp           | 11 ++--
 be/src/vec/exec/join/vhash_join_node.h             | 10 +---
 be/src/vec/exec/join/vjoin_node_base.cpp           |  4 +-
 be/src/vec/exec/join/vjoin_node_base.h             |  7 ++-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 12 ++---
 be/src/vec/exec/join/vnested_loop_join_node.h      |  2 -
 be/src/vec/exec/runtime_filter_consumer.cpp        |  5 +-
 be/test/vec/exec/vwal_scanner_test.cpp             |  2 -
 .../org/apache/doris/planner/RuntimeFilter.java    | 19 +------
 .../main/java/org/apache/doris/qe/Coordinator.java |  6 +--
 31 files changed, 167 insertions(+), 232 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 47157cf74d2..ffb92520be7 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -735,7 +735,9 @@ public:
         _is_bloomfilter = true;
         // we won't use this class to insert or find any data
         // so any type is ok
-        
_context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
+        
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == 
INVALID_TYPE
+                                                                     ? 
PrimitiveType::TYPE_INT
+                                                                     : 
_column_return_type));
         return _context.bloom_filter_func->assign(data, 
bloom_filter->filter_length());
     }
 
@@ -950,12 +952,6 @@ vectorized::SharedRuntimeFilterContext& 
IRuntimeFilter::get_shared_context_ref()
     return _wrapper->_context;
 }
 
-void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
-    _wrapper->_filter_type = other->_wrapper->_filter_type;
-    _wrapper->_is_bloomfilter = other->is_bloomfilter();
-    _wrapper->_context = other->_wrapper->_context;
-}
-
 void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t 
start) {
     DCHECK(is_producer());
     _wrapper->insert_batch(column, start);
@@ -988,7 +984,6 @@ Status IRuntimeFilter::publish(bool publish_local) {
                 filter->signal();
             }
         }
-        return Status::OK();
     } else if (_has_local_target) {
         std::vector<IRuntimeFilter*> filters;
         
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
@@ -1112,6 +1107,10 @@ bool IRuntimeFilter::is_ready_or_timeout() {
     }
 }
 
+PrimitiveType IRuntimeFilter::column_type() const {
+    return _wrapper->column_type();
+}
+
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
     if (_enable_pipeline_exec) {
@@ -1249,16 +1248,12 @@ Status 
IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
 }
 
 Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
-                                      const UpdateRuntimeFilterParamsV2* 
param, ObjectPool* pool,
-                                      
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
-    int filter_type = param->request->filter_type();
-    PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
-    if (param->request->has_in_filter()) {
-        column_type = 
to_primitive_type(param->request->in_filter().column_type());
-    }
-    wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, 
get_type(filter_type),
-                                               param->request->filter_id()));
-
+                                      const UpdateRuntimeFilterParamsV2* param,
+                                      RuntimePredicateWrapper** wrapper) {
+    auto filter_type = param->request->filter_type();
+    PrimitiveType column_type = param->column_type;
+    *wrapper = param->pool->add(new RuntimePredicateWrapper(
+            state, param->pool, column_type, get_type(filter_type), 
param->request->filter_id()));
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
         DCHECK(param->request->has_in_filter());
@@ -1650,8 +1645,9 @@ bool IRuntimeFilter::is_bloomfilter() {
     return _wrapper->is_bloomfilter();
 }
 
-template <typename T>
-Status IRuntimeFilter::_update_filter(const T* param) {
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+    _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
+
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
         const PInFilter in_filter = param->request->in_filter();
         set_ignored(in_filter.ignored_msg());
@@ -1665,19 +1661,25 @@ Status IRuntimeFilter::_update_filter(const T* param) {
     }
     this->signal();
 
-    _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
     return Status::OK();
 }
 
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
-    return _update_filter(param);
-}
-
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
-                                     int64_t start_apply) {
+void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t 
merge_time,
+                                   int64_t start_apply) {
     _profile->add_info_string("UpdateTime",
                               std::to_string(MonotonicMillis() - start_apply) 
+ " ms");
-    return _update_filter(param);
+    _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms");
+    // prevent apply filter to not have right column_return_type remove
+    // the code in the future
+    if (_wrapper->column_type() != wrapper->column_type()) {
+        wrapper->_column_return_type = _wrapper->_column_return_type;
+    }
+    auto origin_type = _wrapper->get_real_type();
+    _wrapper = wrapper;
+    if (origin_type != _wrapper->get_real_type()) {
+        update_runtime_filter_type_to_profile();
+    }
+    this->signal();
 }
 
 Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>&
 probe_ctxs,
@@ -1691,8 +1693,8 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
            (is_string_type(probe_ctx->root()->type().type) &&
             is_string_type(_column_return_type)) ||
            _filter_type == RuntimeFilterType::BITMAP_FILTER)
-            << " prob_expr->root()->type().type: " << 
probe_ctx->root()->type().type
-            << " _column_return_type: " << _column_return_type
+            << " prob_expr->root()->type().type: " << 
int(probe_ctx->root()->type().type)
+            << " _column_return_type: " << int(_column_return_type)
             << " _filter_type: " << IRuntimeFilter::to_string(_filter_type);
 
     auto real_filter_type = get_real_type();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 6d69302c2ea..8fcc4cee17f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -157,13 +157,10 @@ struct UpdateRuntimeFilterParams {
 };
 
 struct UpdateRuntimeFilterParamsV2 {
-    UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req,
-                                butil::IOBufAsZeroCopyInputStream* data_stream,
-                                ObjectPool* obj_pool)
-            : request(req), data(data_stream), pool(obj_pool) {}
     const PPublishFilterRequestV2* request;
     butil::IOBufAsZeroCopyInputStream* data;
     ObjectPool* pool = nullptr;
+    PrimitiveType column_type = INVALID_TYPE;
 };
 
 struct MergeRuntimeFilterParams {
@@ -220,8 +217,6 @@ public:
 
     vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
 
-    void copy_from_other(IRuntimeFilter* other);
-
     // insert data to build filter
     void insert_batch(vectorized::ColumnPtr column, size_t start);
 
@@ -231,6 +226,8 @@ public:
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
+    PrimitiveType column_type() const;
+
     Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
                               std::vector<vectorized::VExprSPtr>& push_exprs, 
bool is_late_arrival);
 
@@ -276,20 +273,21 @@ public:
 
     Status merge_from(const RuntimePredicateWrapper* wrapper);
 
-    // for ut
     static Status create_wrapper(RuntimeFilterParamsContext* state,
                                  const MergeRuntimeFilterParams* param, 
ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
     static Status create_wrapper(RuntimeFilterParamsContext* state,
                                  const UpdateRuntimeFilterParams* param, 
ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
+
     static Status create_wrapper(RuntimeFilterParamsContext* state,
-                                 const UpdateRuntimeFilterParamsV2* param, 
ObjectPool* pool,
-                                 std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
+                                 const UpdateRuntimeFilterParamsV2* param,
+                                 RuntimePredicateWrapper** wrapper);
     void change_to_bloom_filter();
     Status init_bloom_filter(const size_t build_bf_cardinality);
     Status update_filter(const UpdateRuntimeFilterParams* param);
-    Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t 
start_apply);
+    void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t 
merge_time,
+                       int64_t start_apply);
 
     void set_ignored(const std::string& msg);
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 3dcf84ace08..7a738b8c06d 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -34,9 +34,9 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool 
is_global = false)
+            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
is_global = false)
             : _build_expr_context(build_expr_ctxs),
-              _runtime_filter_descs(runtime_filter_descs),
+              _runtime_filters(runtime_filters),
               _is_global(is_global) {}
 
     Status init(RuntimeState* state, int64_t hash_table_size) {
@@ -75,33 +75,28 @@ public:
 
         // ordered vector: IN, IN_OR_BLOOM, others.
         // so we can ignore other filter if IN Predicate exists.
-        std::vector<TRuntimeFilterDesc> 
sorted_runtime_filter_descs(_runtime_filter_descs);
-        auto compare_desc = [](TRuntimeFilterDesc& d1, TRuntimeFilterDesc& d2) 
{
-            if (d1.type == d2.type) {
+        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
+            if (d1->type() == d2->type()) {
                 return false;
-            } else if (d1.type == TRuntimeFilterType::IN) {
+            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
                 return true;
-            } else if (d2.type == TRuntimeFilterType::IN) {
+            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
                 return false;
-            } else if (d1.type == TRuntimeFilterType::IN_OR_BLOOM) {
+            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
                 return true;
-            } else if (d2.type == TRuntimeFilterType::IN_OR_BLOOM) {
+            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
                 return false;
             } else {
-                return d1.type < d2.type;
+                return d1->type() < d2->type();
             }
         };
-        std::sort(sorted_runtime_filter_descs.begin(), 
sorted_runtime_filter_descs.end(),
-                  compare_desc);
+        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
 
         // do not create 'in filter' when hash_table size over limit
         const auto max_in_num = state->runtime_filter_max_in_num();
         const bool over_max_in_num = (hash_table_size >= max_in_num);
 
-        for (auto& filter_desc : sorted_runtime_filter_descs) {
-            IRuntimeFilter* runtime_filter = nullptr;
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
-                                                                             
&runtime_filter));
+        for (auto* runtime_filter : _runtime_filters) {
             if (runtime_filter->expr_order() < 0 ||
                 runtime_filter->expr_order() >= _build_expr_context.size()) {
                 return Status::InternalError(
@@ -133,10 +128,10 @@ public:
                 bool exists_in_filter = 
has_in_filter[runtime_filter->expr_order()];
                 if (is_in_filter && over_max_in_num) {
                     VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter(in filter id " << 
filter_desc.filter_id
-                               << ") because: in_num(" << hash_table_size << 
") >= max_in_num("
-                               << max_in_num << ")";
-                    
RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id));
+                               << " ignore runtime filter(in filter id "
+                               << runtime_filter->filter_id() << ") because: 
in_num("
+                               << hash_table_size << ") >= max_in_num(" << 
max_in_num << ")";
+                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
                     continue;
                 } else if (!is_in_filter && exists_in_filter) {
                     // do not create 'bloom filter' and 'minmax filter' when 
'in filter' has created
@@ -144,15 +139,16 @@ public:
                     VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
                                << " ignore runtime filter("
                                << 
IRuntimeFilter::to_string(runtime_filter->type()) << " id "
-                               << filter_desc.filter_id << ") because: already 
exists in filter";
-                    
RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id));
+                               << runtime_filter->filter_id()
+                               << ") because: already exists in filter";
+                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
                     continue;
                 }
             } else if (is_in_filter && over_max_in_num) {
                 std::string msg = fmt::format(
                         "fragment instance {} ignore runtime filter(in filter 
id {}) because: "
                         "in_num({}) >= max_in_num({})",
-                        print_id(state->fragment_instance_id()), 
filter_desc.filter_id,
+                        print_id(state->fragment_instance_id()), 
runtime_filter->filter_id(),
                         hash_table_size, max_in_num);
                 RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
                 continue;
@@ -163,7 +159,7 @@ public:
                  !over_max_in_num)) {
                 has_in_filter[runtime_filter->expr_order()] = true;
             }
-            
_runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
         }
 
         return Status::OK();
@@ -171,8 +167,8 @@ public:
 
     void insert(const vectorized::Block* block) {
         for (int i = 0; i < _build_expr_context.size(); ++i) {
-            auto iter = _runtime_filters.find(i);
-            if (iter == _runtime_filters.end()) {
+            auto iter = _runtime_filters_map.find(i);
+            if (iter == _runtime_filters_map.end()) {
                 continue;
             }
 
@@ -186,7 +182,7 @@ public:
 
     // publish runtime filter
     Status publish(bool publish_local = false) {
-        for (auto& pair : _runtime_filters) {
+        for (auto& pair : _runtime_filters_map) {
             for (auto& filter : pair.second) {
                 RETURN_IF_ERROR(filter->publish(publish_local));
             }
@@ -195,7 +191,7 @@ public:
     }
 
     void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& it : _runtime_filters) {
+        for (auto& it : _runtime_filters_map) {
             for (auto& filter : it.second) {
                 context->runtime_filters[filter->filter_id()] = 
filter->get_shared_context_ref();
             }
@@ -203,7 +199,7 @@ public:
     }
 
     Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& it : _runtime_filters) {
+        for (auto& it : _runtime_filters_map) {
             for (auto& filter : it.second) {
                 auto filter_id = filter->filter_id();
                 auto ret = context->runtime_filters.find(filter_id);
@@ -216,14 +212,14 @@ public:
         return Status::OK();
     }
 
-    bool empty() { return _runtime_filters.empty(); }
+    bool empty() { return _runtime_filters_map.empty(); }
 
 private:
     const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
_build_expr_context;
-    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+    std::vector<IRuntimeFilter*> _runtime_filters;
     const bool _is_global = false;
     // prob_contition index -> [IRuntimeFilter]
-    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
+    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
 };
 
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
index 7b1a2063d15..1d496ddf557 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -34,17 +34,14 @@ namespace doris {
 // this class used in cross join node
 class VRuntimeFilterSlotsCross {
 public:
-    VRuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs,
+    VRuntimeFilterSlotsCross(const std::vector<IRuntimeFilter*>& 
runtime_filters,
                              const vectorized::VExprContextSPtrs& 
src_expr_ctxs)
-            : _runtime_filter_descs(runtime_filter_descs), 
filter_src_expr_ctxs(src_expr_ctxs) {}
+            : _runtime_filters(runtime_filters), 
filter_src_expr_ctxs(src_expr_ctxs) {}
 
     ~VRuntimeFilterSlotsCross() = default;
 
     Status init(RuntimeState* state) {
-        for (auto& filter_desc : _runtime_filter_descs) {
-            IRuntimeFilter* runtime_filter = nullptr;
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
-                                                                             
&runtime_filter));
+        for (auto* runtime_filter : _runtime_filters) {
             if (runtime_filter == nullptr) {
                 return Status::InternalError("runtime filter is nullptr");
             }
@@ -53,7 +50,6 @@ public:
                 runtime_filter->has_remote_target()) {
                 return Status::InternalError("cross join runtime filter has 
remote target");
             }
-            _runtime_filters.push_back(runtime_filter);
         }
         return Status::OK();
     }
@@ -85,9 +81,8 @@ public:
     bool empty() const { return _runtime_filters.empty(); }
 
 private:
-    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+    const std::vector<IRuntimeFilter*>& _runtime_filters;
     const vectorized::VExprContextSPtrs filter_src_expr_ctxs;
-    std::vector<IRuntimeFilter*> _runtime_filters;
 };
 
 } // namespace doris
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 6c1e50be26f..3f489fc457c 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -371,7 +371,7 @@ Status PushBrokerReader::init() {
     TQueryOptions query_options;
     TQueryGlobals query_globals;
     _runtime_state = RuntimeState::create_unique(params, query_options, 
query_globals,
-                                                 ExecEnv::GetInstance());
+                                                 ExecEnv::GetInstance(), 
nullptr);
     DescriptorTbl* desc_tbl = nullptr;
     Status status = DescriptorTbl::create(_runtime_state->obj_pool(), 
_t_desc_tbl, &desc_tbl);
     if (UNLIKELY(!status.ok())) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 4597bf9876e..19de8fb7bdc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -97,14 +97,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
 
     // Hash Table Init
     _hash_table_init(state);
-
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                p._runtime_filter_descs[i], state->query_options(), 
_build_expr_ctxs.size() == 1,
-                p._use_global_rf, p._child_x->parallel_tasks()));
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
-                p._runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
+                p._runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i],
+                _build_expr_ctxs.size() == 1, p._use_global_rf, 
p._child_x->parallel_tasks()));
     }
 
     return Status::OK();
@@ -121,10 +118,6 @@ bool HashJoinBuildSinkLocalState::build_unique() const {
     return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
 }
 
-std::vector<TRuntimeFilterDesc>& 
HashJoinBuildSinkLocalState::runtime_filter_descs() const {
-    return _parent->cast<HashJoinBuildSinkOperatorX>()._runtime_filter_descs;
-}
-
 void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->short_circuit_for_probe =
@@ -386,9 +379,7 @@ 
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope
           _partition_exprs(tnode.__isset.distribute_expr_lists && 
!_is_broadcast_join
                                    ? tnode.distribute_expr_lists[1]
                                    : std::vector<TExpr> {}),
-          _use_global_rf(use_global_rf) {
-    _runtime_filter_descs = tnode.runtime_filters;
-}
+          _use_global_rf(use_global_rf) {}
 
 Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
     if (_is_broadcast_join) {
@@ -548,13 +539,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                 __builtin_unreachable();
                             },
                             [&](auto&& arg) -> Status {
-                                if (_runtime_filter_descs.empty()) {
+                                if (local_state._runtime_filters.empty()) {
                                     return Status::OK();
                                 }
                                 local_state._runtime_filter_slots =
-                                        
std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs,
-                                                                              
_runtime_filter_descs,
-                                                                              
use_global_rf);
+                                        std::make_shared<VRuntimeFilterSlots>(
+                                                _build_expr_ctxs, 
local_state._runtime_filters,
+                                                use_global_rf);
 
                                 
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
                                         state, arg.hash_table->size()));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index c3d6038b3eb..56a651e4210 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,7 +70,6 @@ public:
     void init_short_circuit_for_probe();
 
     bool build_unique() const;
-    std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
     std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
 
     void add_hash_buckets_info(const std::string& info) const {
@@ -101,7 +100,6 @@ protected:
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
 
-    std::vector<IRuntimeFilter*> _runtime_filters;
     bool _should_build_hash_table = true;
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
@@ -190,7 +188,6 @@ private:
     std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hashtable_controller;
 
     vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     const std::vector<TExpr> _partition_exprs;
 
     const bool _use_global_rf;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 6b930ff8a5e..1141acc650d 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -78,7 +78,8 @@ 
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
                         : tnode.hash_join_node.__isset.is_mark ? 
tnode.hash_join_node.is_mark
                                                                : false),
           _short_circuit_for_null_in_build_side(_join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
-                                                !_is_mark_join) {
+                                                !_is_mark_join),
+          _runtime_filter_descs(tnode.runtime_filters) {
     _init_join_op();
     if (_is_mark_join) {
         DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index f7c1415d371..9cf5be80f80 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -33,6 +33,8 @@ class JoinBuildSinkLocalState : public 
PipelineXSinkLocalState<DependencyType> {
 public:
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
+    const std::vector<IRuntimeFilter*>& runtime_filters() const { return 
_runtime_filters; }
+
 protected:
     JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : PipelineXSinkLocalState<DependencyType>(parent, state) {}
@@ -43,6 +45,7 @@ protected:
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
     RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
+    std::vector<IRuntimeFilter*> _runtime_filters;
 };
 
 template <typename LocalStateType>
@@ -75,6 +78,8 @@ protected:
     // 2. In build phase, we stop materialize build side when we meet the 
first null value and set _has_null_in_build_side to true.
     // 3. In probe phase, if _has_null_in_build_side is true, join node 
returns empty block directly. Otherwise, probing will continue as the same as 
generic left anti join.
     const bool _short_circuit_for_null_in_build_side;
+
+    const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index aec93c66b62..c30bb5ad67c 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -40,24 +40,20 @@ Status 
NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
     for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
     }
+    _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                p._runtime_filter_descs[i], state->query_options()));
+                p._runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i]));
     }
     return Status::OK();
 }
 
-const std::vector<TRuntimeFilterDesc>& 
NestedLoopJoinBuildSinkLocalState::runtime_filter_descs() {
-    return 
_parent->cast<NestedLoopJoinBuildSinkOperatorX>()._runtime_filter_descs;
-}
-
 NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* 
pool,
                                                                    int 
operator_id,
                                                                    const 
TPlanNode& tnode,
                                                                    const 
DescriptorTbl& descs)
         : JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, 
operator_id, tnode,
                                                                     descs),
-          _runtime_filter_descs(tnode.runtime_filters),
           
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only
 &&
                                     
tnode.nested_loop_join_node.is_output_left_side_only),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 27f847cc00f..e89d6a9d0be 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -65,7 +65,6 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs();
     vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return 
_filter_src_expr_ctxs; }
     RuntimeProfile::Counter* runtime_filter_compute_timer() {
         return _runtime_filter_compute_timer;
@@ -115,7 +114,6 @@ private:
 
     vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
 
-    const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     const bool _is_output_left_side_only;
     RowDescriptor _row_descriptor;
 };
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 1fb50c9ea06..6c183a96f64 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -229,13 +229,12 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     // 1. init _runtime_state
     _runtime_state = RuntimeState::create_unique(
             local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-            request.query_options, _query_ctx->query_globals, _exec_env);
+            request.query_options, _query_ctx->query_globals, _exec_env, 
_query_ctx.get());
     if (local_params.__isset.runtime_filter_params) {
         
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
     }
 
     _runtime_state->set_task_execution_context(shared_from_this());
-    _runtime_state->set_query_ctx(_query_ctx.get());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
     // TODO should be combine with plan_fragment_executor.prepare funciton
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index b8a3bf7e922..b247d0ce83f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -75,9 +75,7 @@ public:
 
     TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
 
-    RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
-        return _runtime_state.get();
-    }
+    RuntimeState* get_runtime_state() { return _runtime_state.get(); }
 
     virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId 
/*fragment_instance_id*/) {
         return _runtime_state->runtime_filter_mgr();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 13336ea7ea7..67c5da97063 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -182,8 +182,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     // 1. Set up the global runtime state.
     _runtime_state = RuntimeState::create_unique(request.query_id, 
request.fragment_id,
                                                  request.query_options, 
_query_ctx->query_globals,
-                                                 _exec_env);
-    _runtime_state->set_query_ctx(_query_ctx.get());
+                                                 _exec_env, _query_ctx.get());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
@@ -479,7 +478,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
             
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
-            runtime_state->set_query_ctx(_query_ctx.get());
             runtime_state->set_task_execution_context(shared_from_this());
             runtime_state->set_be_number(local_params.backend_num);
 
@@ -566,7 +564,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                 _task_runtime_states.push_back(RuntimeState::create_unique(
                         this, local_params.fragment_instance_id, 
request.query_id,
                         request.fragment_id, request.query_options, 
_query_ctx->query_globals,
-                        _exec_env));
+                        _exec_env, _query_ctx.get()));
                 auto& task_runtime_state = _task_runtime_states.back();
                 init_runtime_state(task_runtime_state);
                 auto cur_task_id = _total_tasks++;
diff --git a/be/src/runtime/fold_constant_executor.cpp 
b/be/src/runtime/fold_constant_executor.cpp
index 33fb034b60a..de1fd046882 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -136,7 +136,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& 
query_globals,
     fragment_params.params = params;
     fragment_params.protocol_version = PaloInternalServiceVersion::V1;
     _runtime_state = RuntimeState::create_unique(fragment_params.params, 
query_options,
-                                                 query_globals, 
ExecEnv::GetInstance());
+                                                 query_globals, 
ExecEnv::GetInstance(), nullptr);
     DescriptorTbl* desc_tbl = nullptr;
     Status status =
             DescriptorTbl::create(_runtime_state->obj_pool(), 
TDescriptorTable(), &desc_tbl);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c3d2d20e1c6..bac465c1fe8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -807,7 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             static_cast<void>(_runtimefilter_controller.add_entity(
                     params.local_params[i], params.query_id, 
params.query_options, &handler,
-                    
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
+                    
RuntimeFilterParamsContext::create(context->get_runtime_state())));
             context->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
@@ -887,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             static_cast<void>(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
-                    
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
+                    
RuntimeFilterParamsContext::create(context->get_runtime_state())));
             context->set_merge_controller_handler(handler);
 
             {
@@ -1366,20 +1366,29 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
             pool = &fragment_executor->get_query_ctx()->obj_pool;
         }
 
-        UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
-        int filter_id = request->filter_id();
+        // 1. get the target filters
         std::vector<IRuntimeFilter*> filters;
-        RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
+        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
 
-        IRuntimeFilter* first_filter = nullptr;
-        for (auto filter : filters) {
-            if (!first_filter) {
-                RETURN_IF_ERROR(filter->update_filter(&params, start_apply));
-                first_filter = filter;
-            } else {
-                filter->copy_from_other(first_filter);
+        // 2. create the filter wrapper to replace or ignore the target filters
+        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
+            const auto& in_filter = request->in_filter();
+
+            std::ranges::for_each(filters, [&in_filter](auto& filter) {
+                filter->set_ignored(in_filter.ignored_msg());
                 filter->signal();
-            }
+            });
+        } else if (!filters.empty()) {
+            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
+                                                filters[0]->column_type()};
+            RuntimePredicateWrapper* filter_wrapper = nullptr;
+            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(
+                    runtime_filter_mgr->get_runtime_filter_context_state(), 
&params,
+                    &filter_wrapper));
+
+            std::ranges::for_each(filters, [&](auto& filter) {
+                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+            });
         }
     }
 
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index cbeb38204b4..9040fa12040 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -126,9 +126,8 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
     // VLOG_CRITICAL << "request:\n" << 
apache::thrift::ThriftDebugString(request);
 
     const TQueryGlobals& query_globals = _query_ctx->query_globals;
-    _runtime_state =
-            RuntimeState::create_unique(params, request.query_options, 
query_globals, _exec_env);
-    _runtime_state->set_query_ctx(_query_ctx.get());
+    _runtime_state = RuntimeState::create_unique(params, 
request.query_options, query_globals,
+                                                 _exec_env, _query_ctx.get());
     _runtime_state->set_task_execution_context(shared_from_this());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 8281ae2ea6d..155bb430ecb 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -58,17 +58,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, 
RuntimeFilterParams
                                             
ExecEnv::GetInstance()->experimental_mem_tracker());
 }
 
-Status RuntimeFilterMgr::get_producer_filter(const int filter_id, 
IRuntimeFilter** target) {
-    std::lock_guard<std::mutex> l(_lock);
-    auto iter = _producer_map.find(filter_id);
-    if (iter == _producer_map.end()) {
-        return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", 
filter_id);
-    }
-
-    *target = iter->second;
-    return Status::OK();
-}
-
 Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
                                              std::vector<IRuntimeFilter*>& 
consumer_filters) {
     std::lock_guard<std::mutex> l(_lock);
@@ -101,10 +90,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
     }
 
     // TODO: union the remote opt and global two case as one case to one judge
-    bool remote_opt_or_global =
-            (desc.__isset.opt_remote_rf && desc.opt_remote_rf && 
desc.has_remote_targets &&
-             desc.type == TRuntimeFilterType::BLOOM) ||
-            is_global;
+    bool remote_opt_or_global = (desc.__isset.opt_remote_rf && 
desc.opt_remote_rf) || is_global;
 
     if (!has_exist) {
         IRuntimeFilter* filter;
@@ -122,6 +108,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
 
 Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& options,
+                                                  IRuntimeFilter** 
producer_filter,
                                                   bool build_bf_exactly, bool 
is_global,
                                                   int parallel_tasks) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
@@ -133,11 +120,10 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
     if (iter != _producer_map.end()) {
         return Status::InvalidArgument("filter has registed");
     }
-    IRuntimeFilter* filter;
     RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
-                                           RuntimeFilterRole::PRODUCER, -1, 
&filter,
+                                           RuntimeFilterRole::PRODUCER, -1, 
producer_filter,
                                            build_bf_exactly, is_global, 
parallel_tasks));
-    _producer_map.emplace(key, filter);
+    _producer_map.emplace(key, *producer_filter);
     return Status::OK();
 }
 
@@ -196,7 +182,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* 
query_options,
         const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
         const int producer_size) {
-    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
     std::shared_ptr<RuntimeFilterCntlVal> cnt_val = 
std::make_shared<RuntimeFilterCntlVal>();
     // runtime_filter_desc and target will be released,
     // so we need to copy to cnt_val
@@ -206,9 +191,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cnt_val->pool.reset(new ObjectPool());
     cnt_val->filter = cnt_val->pool->add(
             new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, 
runtime_filter_desc));
-
     auto filter_id = runtime_filter_desc->filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options));
+
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
     _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
     return Status::OK();
 }
@@ -303,7 +289,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     if (merged_size == cnt_val->producer_size) {
         if (opt_remote_rf) {
             DCHECK_GT(cnt_val->targetv2_info.size(), 0);
-            DCHECK(cnt_val->filter->is_bloomfilter());
             // Optimize merging phase iff:
             // 1. All BE has been upgraded (e.g. _opt_remote_rf)
             // 2. FE has been upgraded (e.g. cnt_val->targetv2_info.size() > 0)
@@ -492,8 +477,6 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(QueryContext* que
     params->query_id.set_hi(query_ctx->query_id().hi);
     params->query_id.set_lo(query_ctx->query_id().lo);
 
-    // params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
-    // params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
     params->be_exec_version = query_ctx->be_exec_version();
     params->query_ctx = query_ctx;
     params->_obj_pool = &query_ctx->obj_pool;
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index de55e34fc1e..31552ba6230 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -73,15 +73,13 @@ public:
 
     Status get_consume_filters(const int filter_id, 
std::vector<IRuntimeFilter*>& consumer_filters);
 
-    Status get_producer_filter(const int filter_id, IRuntimeFilter** 
producer_filter);
-
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     int node_id, IRuntimeFilter** 
consumer_filter,
                                     bool build_bf_exactly = false, bool 
is_global = false);
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    bool build_bf_exactly = false, bool 
is_global = false,
-                                    int parallel_tasks = 0);
+                                    IRuntimeFilter** producer_filter, bool 
build_bf_exactly = false,
+                                    bool is_global = false, int parallel_tasks 
= 0);
 
     // update filter by remote
     Status update_filter(const PPublishFilterRequest* request,
@@ -91,6 +89,8 @@ public:
 
     Status get_merge_addr(TNetworkAddress* addr);
 
+    RuntimeFilterParamsContext* get_runtime_filter_context_state() const { 
return _state; }
+
 private:
     struct ConsumerFilterHolder {
         int node_id;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d84b86f2290..d25d914147b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -76,7 +76,7 @@ RuntimeState::RuntimeState(const TUniqueId& 
fragment_instance_id,
 
 RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
                            const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
-                           ExecEnv* exec_env)
+                           ExecEnv* exec_env, QueryContext* ctx)
         : _profile("Fragment " + 
print_id(fragment_exec_params.fragment_instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -93,7 +93,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr) {
+          _error_log_file(nullptr),
+          _query_ctx(ctx) {
     Status status =
             init(fragment_exec_params.fragment_instance_id, query_options, 
query_globals, exec_env);
     DCHECK(status.ok());
@@ -106,7 +107,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
 
 RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& 
query_id,
                            int32_t fragment_id, const TQueryOptions& 
query_options,
-                           const TQueryGlobals& query_globals, ExecEnv* 
exec_env)
+                           const TQueryGlobals& query_globals, ExecEnv* 
exec_env, QueryContext* ctx)
         : _profile("Fragment " + print_id(instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -125,7 +126,8 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr) {
+          _error_log_file(nullptr),
+          _query_ctx(ctx) {
     [[maybe_unused]] auto status = init(instance_id, query_options, 
query_globals, exec_env);
     DCHECK(status.ok());
     _runtime_filter_mgr.reset(
@@ -135,7 +137,7 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
 RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const 
TUniqueId& instance_id,
                            const TUniqueId& query_id, int32_t fragment_id,
                            const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
-                           ExecEnv* exec_env)
+                           ExecEnv* exec_env, QueryContext* ctx)
         : _profile("Fragment " + print_id(instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -155,14 +157,15 @@ 
RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId&
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr) {
+          _error_log_file(nullptr),
+          _query_ctx(ctx) {
     [[maybe_unused]] auto status = init(instance_id, query_options, 
query_globals, exec_env);
     DCHECK(status.ok());
 }
 
 RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
                            const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
-                           ExecEnv* exec_env)
+                           ExecEnv* exec_env, QueryContext* ctx)
         : _profile("PipelineX  " + std::to_string(fragment_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -181,7 +184,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr) {
+          _error_log_file(nullptr),
+          _query_ctx(ctx) {
     // TODO: do we really need instance id?
     Status status = init(TUniqueId(), query_options, query_globals, exec_env);
     DCHECK(status.ok());
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 38053d3cb68..cdc7e83f042 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -70,20 +70,20 @@ public:
 
     RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
                  const TQueryOptions& query_options, const TQueryGlobals& 
query_globals,
-                 ExecEnv* exec_env);
+                 ExecEnv* exec_env, QueryContext* ctx);
 
     RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, 
int32 fragment_id,
                  const TQueryOptions& query_options, const TQueryGlobals& 
query_globals,
-                 ExecEnv* exec_env);
+                 ExecEnv* exec_env, QueryContext* ctx);
 
     // for only use in pipelineX
     RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& 
instance_id,
                  const TUniqueId& query_id, int32 fragment_id, const 
TQueryOptions& query_options,
-                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env, 
QueryContext* ctx);
 
     // Used by pipelineX. This runtime state is only used for setup.
     RuntimeState(const TUniqueId& query_id, int32 fragment_id, const 
TQueryOptions& query_options,
-                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env, 
QueryContext* ctx);
 
     // RuntimeState for executing expr in fe-support.
     RuntimeState(const TQueryGlobals& query_globals);
@@ -466,8 +466,6 @@ public:
         _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
     }
 
-    void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
-
     QueryContext* get_query_ctx() { return _query_ctx; }
 
     void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
tracker) {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 3ca828a7f35..67dffa4b203 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -92,7 +92,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& 
tnode, const Descr
           
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
                                         ? 
tnode.hash_join_node.hash_output_slot_ids
                                         : std::vector<SlotId> {}) {
-    _runtime_filter_descs = tnode.runtime_filters;
     _arena = std::make_shared<Arena>();
     _hash_table_variants = std::make_shared<HashTableVariants>();
     _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
@@ -180,12 +179,10 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     }
 #endif
 
-    _runtime_filters.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                _runtime_filter_descs[i], state->query_options(), 
_probe_expr_ctxs.size() == 1));
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
-                _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
+                _runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i],
+                _probe_expr_ctxs.size() == 1));
     }
 
     // init left/right output slots flags, only column of slot_id in 
_hash_output_slot_ids need
@@ -795,11 +792,11 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                   __builtin_unreachable();
                               },
                               [&](auto&& arg) -> Status {
-                                  if (_runtime_filter_descs.empty()) {
+                                  if (_runtime_filters.empty()) {
                                       return Status::OK();
                                   }
                                   _runtime_filter_slots = 
std::make_shared<VRuntimeFilterSlots>(
-                                          _build_expr_ctxs, 
_runtime_filter_descs);
+                                          _build_expr_ctxs, _runtime_filters);
 
                                   RETURN_IF_ERROR(_runtime_filter_slots->init(
                                           state, arg.hash_table->size()));
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index a017633e5ce..58451c360e6 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -53,9 +53,7 @@ template <typename T>
 struct HashCRC32;
 
 namespace doris {
-
 class ObjectPool;
-class IRuntimeFilter;
 class DescriptorTbl;
 class RuntimeState;
 
@@ -77,11 +75,11 @@ class HashJoinNode;
 template <typename Parent>
 Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* 
parent,
                                     bool is_global = false) {
-    if (parent->runtime_filter_descs().empty()) {
+    if (parent->runtime_filters().empty()) {
         return Status::OK();
     }
     parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
-            parent->_build_expr_ctxs, parent->runtime_filter_descs(), 
is_global);
+            parent->_build_expr_ctxs, parent->runtime_filters(), is_global);
 
     RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
 
@@ -236,7 +234,6 @@ public:
     DataTypes right_table_data_types() { return _right_table_data_types; }
     DataTypes left_table_data_types() { return _left_table_data_types; }
     bool build_unique() const { return _build_unique; }
-    std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return 
_runtime_filter_descs; }
     std::shared_ptr<vectorized::Arena> arena() { return _arena; }
 
 protected:
@@ -400,9 +397,6 @@ private:
     friend Status process_runtime_filter_build(RuntimeState* state, 
vectorized::Block* block,
                                                Parent* parent, bool is_global);
 
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-
-    std::vector<IRuntimeFilter*> _runtime_filters;
     std::atomic_bool _probe_open_finish = false;
     std::vector<int> _build_col_ids;
 };
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 3436209c4cd..6fab6b8b91f 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -79,7 +79,9 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const 
TPlanNode& tnode, const Des
                                 : tnode.hash_join_node.__isset.is_mark &&
                                           tnode.hash_join_node.is_mark),
           _short_circuit_for_null_in_build_side(_join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
-                                                !_is_mark_join) {
+                                                !_is_mark_join),
+          _runtime_filter_descs(tnode.runtime_filters) {
+    _runtime_filters.resize(_runtime_filter_descs.size());
     _init_join_op();
     if (_is_mark_join) {
         DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 95d10a0a07d..0e6ac3c9837 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -35,7 +35,7 @@
 namespace doris {
 class ObjectPool;
 class RuntimeState;
-
+class IRuntimeFilter;
 } // namespace doris
 
 namespace doris::vectorized {
@@ -74,6 +74,8 @@ public:
 
     [[nodiscard]] bool can_terminate_early() override { return 
_short_circuit_for_probe; }
 
+    const std::vector<IRuntimeFilter*>& runtime_filters() const { return 
_runtime_filters; }
+
 protected:
     // Construct the intermediate blocks to store the results from join 
operation.
     void _construct_mutable_join_block();
@@ -147,6 +149,9 @@ protected:
     RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<IRuntimeFilter*> _runtime_filters;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 08d4c7d4487..150068096ba 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -34,7 +34,6 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
 #include "exec/exec_node.h"
-#include "exprs/runtime_filter.h"
 #include "exprs/runtime_filter_slots_cross.h"
 #include "gutil/integral_types.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -43,14 +42,12 @@
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/simd/bits.h"
-#include "vec/columns/column_const.h"
 #include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/assert_cast.h"
 #include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/exprs/vexpr.h"
@@ -65,10 +62,10 @@ namespace doris::vectorized {
 
 template <typename Parent>
 Status RuntimeFilterBuild<Parent>::operator()(RuntimeState* state) {
-    if (_parent->runtime_filter_descs().empty()) {
+    if (_parent->runtime_filters().empty()) {
         return Status::OK();
     }
-    VRuntimeFilterSlotsCross 
runtime_filter_slots(_parent->runtime_filter_descs(),
+    VRuntimeFilterSlotsCross runtime_filter_slots(_parent->runtime_filters(),
                                                   
_parent->filter_src_expr_ctxs());
 
     RETURN_IF_ERROR(runtime_filter_slots.init(state));
@@ -96,8 +93,7 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, 
const TPlanNode& tnod
           _matched_rows_done(false),
           _left_block_pos(0),
           _left_side_eos(false),
-          _old_version_flag(!tnode.__isset.nested_loop_join_node),
-          _runtime_filter_descs(tnode.runtime_filters) {
+          _old_version_flag(!tnode.__isset.nested_loop_join_node) {
     _left_block = Block::create_shared();
 }
 
@@ -123,7 +119,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
         RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                _runtime_filter_descs[i], state->query_options()));
+                _runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i]));
     }
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, 
_filter_src_expr_ctxs));
     return Status::OK();
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h 
b/be/src/vec/exec/join/vnested_loop_join_node.h
index 73264859274..a8021bb4251 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -97,7 +97,6 @@ public:
 
     std::shared_ptr<Block> get_left_block() { return _left_block; }
 
-    std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return 
_runtime_filter_descs; }
     VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
     RuntimeProfile::Counter* runtime_filter_compute_timer() {
         return _runtime_filter_compute_timer;
@@ -267,7 +266,6 @@ private:
 
     MutableColumns _dst_columns;
 
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     VExprContextSPtrs _filter_src_expr_ctxs;
     bool _is_output_left_side_only = false;
     bool _need_more_input_data = true;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 5e2d90bf62d..eba11dac45d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -53,10 +53,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter(bool 
is_global) {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
         if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && 
filter_desc.has_remote_targets);
-            // Optimize merging phase iff:
-            // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
-            // 2. This filter is bloom filter (only bloom filter should be 
used for merging)
+            DCHECK(filter_desc.has_remote_targets);
             
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
                     filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
                     is_global));
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index 6f983ca5bf4..b47d7563454 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -41,7 +41,6 @@ public:
         _profile = _runtime_state.runtime_profile();
         _runtime_state.init_mem_trackers();
         static_cast<void>(_runtime_state.init(unique_id, query_options, 
query_globals, _env));
-        _runtime_state.set_query_ctx(query_ctx);
     }
     void init();
 
@@ -75,7 +74,6 @@ private:
     TUniqueId unique_id;
     TQueryOptions query_options;
     TQueryGlobals query_globals;
-    QueryContext* query_ctx = nullptr;
 };
 
 void VWalScannerTest::init_desc_table() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 58d4d3eabe3..ade087e2de0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -108,8 +108,6 @@ public final class RuntimeFilter {
 
     private boolean bitmapFilterNotIn = false;
 
-    private boolean useRemoteRfOpt = true;
-
     private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType;
 
     private boolean bloomFilterSizeCalculatedByNdv = false;
@@ -207,17 +205,6 @@ public final class RuntimeFilter {
         this.bitmapFilterNotIn = bitmapFilterNotIn;
     }
 
-    public void computeUseRemoteRfOpt() {
-        for (RuntimeFilterTarget target : targets) {
-            useRemoteRfOpt = useRemoteRfOpt && hasRemoteTargets && 
runtimeFilterType == TRuntimeFilterType.BLOOM
-                    && target.expr instanceof SlotRef;
-        }
-    }
-
-    public boolean getUseRemoteRfOpt() {
-        return useRemoteRfOpt;
-    }
-
     /**
      * Serializes a runtime filter to Thrift.
      */
@@ -229,12 +216,8 @@ public final class RuntimeFilter {
         tFilter.setIsBroadcastJoin(isBroadcastJoin);
         tFilter.setHasLocalTargets(hasLocalTargets);
         tFilter.setHasRemoteTargets(hasRemoteTargets);
-        boolean optRemoteRf = true;
         for (RuntimeFilterTarget target : targets) {
             tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), 
target.expr.treeToThrift());
-            // TODO: now only support SlotRef
-            optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType 
== TRuntimeFilterType.BLOOM
-                    && target.expr instanceof SlotRef;
         }
         tFilter.setType(runtimeFilterType);
         tFilter.setBloomFilterSizeBytes(filterSizeBytes);
@@ -245,7 +228,7 @@ public final class RuntimeFilter {
         if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) {
             tFilter.setMinMaxType(tMinMaxRuntimeFilterType);
         }
-        tFilter.setOptRemoteRf(optRemoteRf);
+        tFilter.setOptRemoteRf(hasRemoteTargets);
         
tFilter.setBloomFilterSizeCalculatedByNdv(bloomFilterSizeCalculatedByNdv);
         return tFilter;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5f4154cb2b2..784f63266c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3736,8 +3736,7 @@ public class Coordinator implements CoordInterface {
                             continue;
                         }
                         List<FRuntimeFilterTargetParam> fParams = 
ridToTargetParam.get(rf.getFilterId());
-                        rf.computeUseRemoteRfOpt();
-                        if (rf.getUseRemoteRfOpt()) {
+                        if (rf.hasRemoteTargets()) {
                             Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> 
targetParamsV2 = new HashMap<>();
                             for (FRuntimeFilterTargetParam targetParam : 
fParams) {
                                 if 
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
@@ -3870,8 +3869,7 @@ public class Coordinator implements CoordInterface {
                             continue;
                         }
                         List<FRuntimeFilterTargetParam> fParams = 
ridToTargetParam.get(rf.getFilterId());
-                        rf.computeUseRemoteRfOpt();
-                        if (rf.getUseRemoteRfOpt()) {
+                        if (rf.hasRemoteTargets()) {
                             Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> 
targetParamsV2 = new HashMap<>();
                             for (FRuntimeFilterTargetParam targetParam : 
fParams) {
                                 if 
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to