This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fef37  [improvement](runtime-filter) Support adaptive runtime 
filter(#7546) (#7645)
c1fef37 is described below

commit c1fef37399b2f7af051d8e3278e7911282c3f879
Author: 924060929 <[email protected]>
AuthorDate: Sun Jan 30 16:46:52 2022 +0800

    [improvement](runtime-filter) Support adaptive runtime filter(#7546) (#7645)
    
    Change 1: Support an adaptive runtime filter: IN_OR_BLOOM_FILTER
        The processing logic is
        If the number of rows in the right table < runtime_filter_max_in_num, 
then IN predicate will work
        If the number of rows in the right table >= runtime_filter_max_in_num, 
then Bloom filter can take effect
    
    Change 2: The default runtime filter is changed to filter: 
IN_OR_BLOOM_FILTER
---
 be/src/exprs/runtime_filter.cpp                    | 172 +++++++++++++-
 be/src/exprs/runtime_filter.h                      |  12 +-
 be/src/exprs/runtime_filter_slots.h                |  42 +++-
 be/test/exprs/runtime_filter_test.cpp              | 263 ++++++++++++++++++++-
 docs/en/administrator-guide/runtime-filter.md      |   6 +-
 docs/zh-CN/administrator-guide/runtime-filter.md   |   7 +-
 .../apache/doris/qe/RuntimeFilterTypeHelper.java   |   4 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   4 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  70 +++++-
 .../doris/planner/RuntimeFilterGeneratorTest.java  | 229 ++++++++++++++++--
 .../doris/qe/RuntimeFilterTypeHelperTest.java      |  29 ++-
 .../java/org/apache/doris/qe/VariableMgrTest.java  |   5 +
 gensrc/proto/internal_service.proto                |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 14 files changed, 788 insertions(+), 57 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8e0e55e..ea0aa26 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -189,6 +189,8 @@ PFilterType get_type(RuntimeFilterType type) {
         return PFilterType::BLOOM_FILTER;
     case RuntimeFilterType::MINMAX_FILTER:
         return PFilterType::MINMAX_FILTER;
+    case RuntimeFilterType::IN_OR_BLOOM_FILTER:
+        return PFilterType::IN_OR_BLOOM_FILTER;
     default:
         return PFilterType::UNKNOW_FILTER;
     }
@@ -342,6 +344,12 @@ public:
             break;
         }
         case RuntimeFilterType::BLOOM_FILTER: {
+            _is_bloomfilter = true;
+            _bloomfilter_func.reset(create_bloom_filter(_tracker, 
_column_return_type));
+            return 
_bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+        }
+        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+            _hybrid_set.reset(create_set(_column_return_type));
             _bloomfilter_func.reset(create_bloom_filter(_tracker, 
_column_return_type));
             return 
_bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
         }
@@ -351,6 +359,22 @@ public:
         return Status::OK();
     }
 
+    void change_to_bloom_filter() {
+        CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
+                << "Can not change to bloom filter because of runtime filter 
type is "
+                << to_string(_filter_type);
+        _is_bloomfilter = true;
+        if (_hybrid_set->size() > 0) {
+            auto it = _hybrid_set->begin();
+            while (it->has_next()) {
+                _bloomfilter_func->insert(it->get_value());
+                it->next();
+            }
+            // release in filter
+            _hybrid_set.reset(create_set(_column_return_type));
+        }
+    }
+
     void insert(const void* data) {
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
@@ -368,6 +392,14 @@ public:
             _bloomfilter_func->insert(data);
             break;
         }
+        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+            if (_is_bloomfilter) {
+                _bloomfilter_func->insert(data);
+            } else {
+                _hybrid_set->insert(data);
+            }
+            break;
+        }
         default:
             DCHECK(false);
             break;
@@ -403,6 +435,16 @@ public:
         }
     }
 
+    RuntimeFilterType get_real_type() {
+        auto real_filter_type = _filter_type;
+        if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            real_filter_type = _is_bloomfilter
+                                       ? RuntimeFilterType::BLOOM_FILTER
+                                       : RuntimeFilterType::IN_FILTER;
+        }
+        return real_filter_type;
+    }
+
     template <class T>
     Status get_push_context(T* container, RuntimeState* state, ExprContext* 
prob_expr) {
         DCHECK(state != nullptr);
@@ -410,7 +452,8 @@ public:
         DCHECK(_pool != nullptr);
         DCHECK(prob_expr->root()->type().type == _column_return_type);
 
-        switch (_filter_type) {
+        auto real_filter_type = get_real_type();
+        switch (real_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
             if (!_is_ignored_in_filter) {
                 TTypeDesc type_desc = create_type_desc(_column_return_type);
@@ -468,16 +511,25 @@ public:
     }
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
-        DCHECK(_filter_type == wrapper->_filter_type);
-        if (_filter_type != wrapper->_filter_type) {
-            return Status::InvalidArgument("invalid filter type");
-        }
+        bool can_not_merge_in_or_bloom
+            = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                  (wrapper->_filter_type != RuntimeFilterType::IN_FILTER
+                   && wrapper->_filter_type != 
RuntimeFilterType::BLOOM_FILTER);
+
+        bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER
+                               && _filter_type != wrapper->_filter_type;
+
+        CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
+                << "fragment instance " << _fragment_instance_id.to_string()
+                << " can not merge runtime filter(id=" << _filter_id << "), 
current is filter type is "
+                << to_string(_filter_type) << ", other filter type is " << 
to_string(wrapper->_filter_type);
+
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
             if (_is_ignored_in_filter) {
                 break;
             } else if (wrapper->_is_ignored_in_filter) {
-                LOG(INFO) << "fragment instance " << 
_fragment_instance_id.to_string()
+                VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
                           << " ignore merge runtime filter(in filter id "
                           << _filter_id << ") because: " << 
*(wrapper->get_ignored_in_filter_msg());
 
@@ -490,17 +542,20 @@ public:
             // try insert set
             _hybrid_set->insert(wrapper->_hybrid_set.get());
             if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
+#ifdef VLOG_DEBUG_IS_ON
                 std::stringstream msg;
                 msg << "fragment instance " << 
_fragment_instance_id.to_string()
                     << " ignore merge runtime filter(in filter id "
                     << _filter_id << ") because: in_num(" << 
_hybrid_set->size()
                     << ") >= max_in_num(" << _max_in_num << ")";
                 _ignored_in_filter_msg = _pool->add(new 
std::string(msg.str()));
+#else
+                _ignored_in_filter_msg = _pool->add(new 
std::string("ignored"));
+#endif
                 _is_ignored_in_filter = true;
 
                 // release in filter
                 _hybrid_set.reset(create_set(_column_return_type));
-                LOG(INFO) << msg.str();
             }
             break;
         }
@@ -512,6 +567,53 @@ public:
             _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
             break;
         }
+        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+            auto real_filter_type = _is_bloomfilter
+                                       ? RuntimeFilterType::BLOOM_FILTER
+                                       : RuntimeFilterType::IN_FILTER;
+            if (real_filter_type == RuntimeFilterType::IN_FILTER) {
+                if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// in merge in
+                    CHECK(!wrapper->_is_ignored_in_filter)
+                            << "fragment instance " << 
_fragment_instance_id.to_string()
+                            << " can not ignore merge runtime filter(in filter 
id "
+                            << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
+                            << *(wrapper->get_ignored_in_filter_msg());
+                    _hybrid_set->insert(wrapper->_hybrid_set.get());
+                    if (_max_in_num >= 0 && _hybrid_set->size() >= 
_max_in_num) {
+                        VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
+                            << " change runtime filter to bloom filter(id=" << 
_filter_id
+                            << ") because: in_num(" << _hybrid_set->size()
+                            << ") >= max_in_num(" << _max_in_num << ")";
+                        change_to_bloom_filter();
+                    }
+                // in merge bloom filter
+                } else {
+                    VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
+                        << " change runtime filter to bloom filter(id=" << 
_filter_id
+                        << ") because: already exist a bloom filter";
+                    change_to_bloom_filter();
+                    _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+                }
+            } else {
+                if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// bloom filter merge in
+                    CHECK(!wrapper->_is_ignored_in_filter)
+                            << "fragment instance " << 
_fragment_instance_id.to_string()
+                            << " can not ignore merge runtime filter(in filter 
id "
+                            << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
+                            << *(wrapper->get_ignored_in_filter_msg());
+                    auto it = wrapper->_hybrid_set->begin();
+                    while (it->has_next()) {
+                        auto value = it->get_value();
+                        _bloomfilter_func->insert(value);
+                        it->next();
+                    }
+                // bloom filter merge bloom filter
+                } else {
+                    _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+                }
+            }
+            break;
+        }
         default:
             DCHECK(false);
             return Status::InternalError("unknown runtime filter");
@@ -524,7 +626,7 @@ public:
 
         PrimitiveType type = to_primitive_type(in_filter->column_type());
         if (in_filter->has_ignored_msg()) {
-            LOG(INFO) << "Ignore in filter because: " << 
in_filter->ignored_msg();
+            VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: 
" << in_filter->ignored_msg();
             _is_ignored_in_filter = true;
             _ignored_in_filter_msg = _pool->add(new 
std::string(in_filter->ignored_msg()));
             return Status::OK();
@@ -625,6 +727,7 @@ public:
     // assign this filter by protobuf
     Status assign(const PBloomFilter* bloom_filter, const char* data) {
         DCHECK(_tracker != nullptr);
+        _is_bloomfilter = true;
         // we won't use this class to insert or find any data
         // so any type is ok
         _bloomfilter_func.reset(create_bloom_filter(_tracker, 
PrimitiveType::TYPE_INT));
@@ -766,6 +869,10 @@ public:
         }
     }
 
+    bool is_bloomfilter() const {
+        return _is_bloomfilter;
+    }
+
     bool is_ignored_in_filter() const {
         return _is_ignored_in_filter;
     }
@@ -791,6 +898,7 @@ private:
     std::unique_ptr<MinMaxFuncBase> _minmax_func;
     std::unique_ptr<HybridSetBase> _hybrid_set;
     std::unique_ptr<IBloomFilterFuncBase> _bloomfilter_func;
+    bool _is_bloomfilter = false;
     bool _is_ignored_in_filter = false;
     std::string *_ignored_in_filter_msg;
     UniqueId _fragment_instance_id;
@@ -828,6 +936,7 @@ Status IRuntimeFilter::publish() {
         DCHECK(status.ok());
         // push down
         std::swap(this->_wrapper, consumer_filter->_wrapper);
+        consumer_filter->update_runtime_filter_type_to_profile();
         consumer_filter->signal();
         return Status::OK();
     } else {
@@ -904,6 +1013,8 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
         _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
     } else if (desc->type == TRuntimeFilterType::IN) {
         _runtime_filter_type = RuntimeFilterType::IN_FILTER;
+    } else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
+        _runtime_filter_type = RuntimeFilterType::IN_OR_BLOOM_FILTER;
     } else {
         return Status::InvalidArgument("unknown filter type");
     }
@@ -961,6 +1072,14 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParams* param, Me
     return _create_wrapper(param, tracker, pool, wrapper);
 }
 
+void IRuntimeFilter::change_to_bloom_filter() {
+    auto origin_type = _wrapper->get_real_type();
+    _wrapper->change_to_bloom_filter();
+    if (origin_type != _wrapper->get_real_type()) {
+        update_runtime_filter_type_to_profile();
+    }
+}
+
 template <class T>
 Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, 
ObjectPool* pool,
                                        
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
@@ -995,6 +1114,16 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
     _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
     _effect_timer.reset(new 
ScopedTimer<MonotonicStopWatch>(_effect_time_cost));
     _effect_timer->start();
+
+    if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+        update_runtime_filter_type_to_profile();
+    }
+}
+
+void IRuntimeFilter::update_runtime_filter_type_to_profile() {
+    if (_profile.get() != nullptr) {
+        _profile->add_info_string("RealRuntimeFilterType", 
::doris::to_string(_wrapper->get_real_type()));
+    }
 }
 
 void IRuntimeFilter::set_push_down_profile() {
@@ -1010,11 +1139,15 @@ Status IRuntimeFilter::merge_from(const 
RuntimePredicateWrapper* wrapper) {
         set_ignored();
         set_ignored_msg(*(wrapper->get_ignored_in_filter_msg()));
     }
+    auto origin_type = _wrapper->get_real_type();
     Status status = _wrapper->merge(wrapper);
     if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
         set_ignored();
         set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg()));
     }
+    if (origin_type != _wrapper->get_real_type()) {
+        update_runtime_filter_type_to_profile();
+    }
     return status;
 }
 
@@ -1035,17 +1168,24 @@ void batch_copy(PInFilter* filter, 
HybridSetBase::IteratorBase* it,
 
 template <class T>
 Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) {
-    request->set_filter_type(get_type(_runtime_filter_type));
+    auto real_runtime_filter_type = _runtime_filter_type;
+    if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+        real_runtime_filter_type = _wrapper->is_bloomfilter()
+                                      ? RuntimeFilterType::BLOOM_FILTER
+                                      : RuntimeFilterType::IN_FILTER;
+    }
 
-    if (_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
+    request->set_filter_type(get_type(real_runtime_filter_type));
+
+    if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
         auto in_filter = request->mutable_in_filter();
         to_protobuf(in_filter);
-    } else if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
+    } else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
         RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len));
         DCHECK(data != nullptr);
         request->mutable_bloom_filter()->set_filter_length(*len);
         request->mutable_bloom_filter()->set_always_true(false);
-    } else if (_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+    } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
         auto minmax_filter = request->mutable_minmax_filter();
         to_protobuf(minmax_filter);
     } else {
@@ -1231,6 +1371,10 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
     }
 }
 
+bool IRuntimeFilter::is_bloomfilter() {
+    return _wrapper->is_bloomfilter();
+}
+
 Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
         set_ignored();
@@ -1240,7 +1384,11 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
     }
     std::unique_ptr<RuntimePredicateWrapper> wrapper;
     RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _mem_tracker, _pool, 
&wrapper));
+    auto origin_type = _wrapper->get_real_type();
     RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
+    if (origin_type != _wrapper->get_real_type()) {
+        update_runtime_filter_type_to_profile();
+    }
     this->signal();
     return Status::OK();
 }
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 60a3c68..9df757f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -50,7 +50,8 @@ enum class RuntimeFilterType {
     UNKNOWN_FILTER = -1,
     IN_FILTER = 0,
     MINMAX_FILTER = 1,
-    BLOOM_FILTER = 2
+    BLOOM_FILTER = 2,
+    IN_OR_BLOOM_FILTER = 3
 };
 
 inline std::string to_string(RuntimeFilterType type) {
@@ -64,6 +65,9 @@ inline std::string to_string(RuntimeFilterType type) {
     case RuntimeFilterType::MINMAX_FILTER: {
         return std::string("minmax");
     }
+    case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+        return std::string("in_or_bloomfilter");
+    }
     default:
         return std::string("UNKNOWN");
     }
@@ -193,6 +197,7 @@ public:
     static Status create_wrapper(const UpdateRuntimeFilterParams* param, 
MemTracker* tracker,
                                  ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
+    void change_to_bloom_filter();
     Status update_filter(const UpdateRuntimeFilterParams* param);
 
     void set_ignored() { _is_ignored = true; }
@@ -202,6 +207,9 @@ public:
 
     void set_ignored_msg(std::string &msg) { _ignored_msg = msg; }
 
+    // for ut
+    bool is_bloomfilter();
+
     // consumer should call before released
     Status consumer_close();
 
@@ -211,6 +219,8 @@ public:
 
     void init_profile(RuntimeProfile* parent_profile);
 
+    void update_runtime_filter_type_to_profile();
+
     void set_push_down_profile();
 
     void ready_for_publish();
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index e098adc..5326df4 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -60,7 +60,27 @@ public:
             runtime_filter->publish_finally();
         };
 
-        for (auto& filter_desc : _runtime_filter_descs) {
+        // ordered vector: IN, IN_OR_BLOOM, others.
+        // so we can ignore other filter if IN Predicate exists.
+        std::vector<TRuntimeFilterDesc> 
sorted_runtime_filter_descs(_runtime_filter_descs);
+        auto compare_desc = [](TRuntimeFilterDesc& d1, TRuntimeFilterDesc& d2) 
{
+            if (d1.type == d2.type) {
+                return false;
+            } else if (d1.type == TRuntimeFilterType::IN) {
+                return true;
+            } else if (d2.type == TRuntimeFilterType::IN) {
+                return false;
+            } else if (d1.type == TRuntimeFilterType::IN_OR_BLOOM) {
+                return true;
+            } else if (d2.type == TRuntimeFilterType::IN_OR_BLOOM) {
+                return false;
+            } else {
+                return d1.type < d2.type;
+            }
+        };
+        std::sort(sorted_runtime_filter_descs.begin(), 
sorted_runtime_filter_descs.end(), compare_desc);
+
+        for (auto& filter_desc : sorted_runtime_filter_descs) {
             IRuntimeFilter* runtime_filter = nullptr;
             
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
                                                                              
&runtime_filter));
@@ -74,6 +94,10 @@ public:
 
             bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
 
+            if (over_max_in_num && runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+                runtime_filter->change_to_bloom_filter();
+            }
+
             // Note:
             // In the case that exist *remote target* and in filter and other 
filter,
             // we must merge other filter whatever in filter is over the max 
num in current node,
@@ -85,7 +109,7 @@ public:
             if (!runtime_filter->has_remote_target()) {
                 bool exists_in_filter = 
has_in_filter[runtime_filter->expr_order()];
                 if (is_in_filter && over_max_in_num) {
-                    LOG(INFO) << "fragment instance " << 
print_id(state->fragment_instance_id())
+                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
                               << " ignore runtime filter(in filter id " << 
filter_desc.filter_id
                               << ") because: in_num(" << hash_table_size
                               << ") >= max_in_num(" << max_in_num << ")";
@@ -94,7 +118,7 @@ public:
                 } else if (!is_in_filter && exists_in_filter) {
                     // do not create 'bloom filter' and 'minmax filter' when 
'in filter' has created
                     // because in filter is exactly filter, so it is enough to 
filter data
-                    LOG(INFO) << "fragment instance " << 
print_id(state->fragment_instance_id())
+                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
                               << " ignore runtime filter(" << 
to_string(runtime_filter->type())
                               << " id " << filter_desc.filter_id
                               << ") because: already exists in filter";
@@ -102,14 +126,20 @@ public:
                     continue;
                 }
             } else if (is_in_filter && over_max_in_num) {
+#ifdef VLOG_DEBUG_IS_ON
                 std::string msg = fmt::format("fragment instance {} ignore 
runtime filter(in filter id {}) because: in_num({}) >= max_in_num({})",
-                  print_id(state->fragment_instance_id()), 
filter_desc.filter_id, hash_table_size, max_in_num);
+                                              
print_id(state->fragment_instance_id()), filter_desc.filter_id, 
hash_table_size, max_in_num);
                 ignore_remote_filter(runtime_filter, msg);
+#else
+                ignore_remote_filter(runtime_filter, "ignored");
+#endif
                 continue;
             }
 
-            has_in_filter[runtime_filter->expr_order()] =
-                    (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+            if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER)
+                || (runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER && !over_max_in_num)) {
+                has_in_filter[runtime_filter->expr_order()] = true;
+            }
             
_runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
         }
 
diff --git a/be/test/exprs/runtime_filter_test.cpp 
b/be/test/exprs/runtime_filter_test.cpp
index 68cefab..6ca422f 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -174,7 +174,6 @@ TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
 }
 
 TEST_F(RuntimeFilterTest, runtime_filter_merge_in_filter_test) {
-    // size_t prob_index = 0;
     SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
     ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
     ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
@@ -223,7 +222,6 @@ TEST_F(RuntimeFilterTest, 
runtime_filter_merge_in_filter_test) {
 }
 
 TEST_F(RuntimeFilterTest, runtime_filter_ignore_in_filter_test) {
-    // size_t prob_index = 0;
     SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
     ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
     ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
@@ -271,6 +269,267 @@ TEST_F(RuntimeFilterTest, 
runtime_filter_ignore_in_filter_test) {
     }
 }
 
+TEST_F(RuntimeFilterTest, runtime_filter_in_or_bloom_filter_in_merge_in_test) {
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    TQueryOptions options;
+    options.runtime_filter_max_in_num = 3;
+
+    auto rows1 = create_rows(&_obj_pool, 1, 1);
+    auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+    IRuntimeFilter* runtime_filter =
+            create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter, build_expr_ctx, rows1);
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+    IRuntimeFilter* runtime_filter2 =
+            create_runtime_filter(TRuntimeFilterType::IN, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter2, build_expr_ctx, rows2);
+    ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+    ASSERT_TRUE(status.ok());
+    ASSERT_FALSE(runtime_filter->is_ignored());
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
+    ASSERT_FALSE(expr_context_list.empty());
+
+    for (TupleRow& row : *rows1) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    for (TupleRow& row : *rows2) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
+TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_in_merge_in_upgrade_test) {
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    TQueryOptions options;
+    options.runtime_filter_max_in_num = 2;
+
+    auto rows1 = create_rows(&_obj_pool, 1, 1);
+    auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+    IRuntimeFilter* runtime_filter =
+            create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter, build_expr_ctx, rows1);
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+    IRuntimeFilter* runtime_filter2 =
+            create_runtime_filter(TRuntimeFilterType::IN, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter2, build_expr_ctx, rows2);
+    ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+    ASSERT_TRUE(status.ok());
+    ASSERT_FALSE(runtime_filter->is_ignored());
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
+    ASSERT_FALSE(expr_context_list.empty());
+
+    for (TupleRow& row : *rows1) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    for (TupleRow& row : *rows2) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
+TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_in_merge_bloom_filter_upgrade_test) {
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    TQueryOptions options;
+    options.runtime_filter_max_in_num = 100;
+
+    auto rows1 = create_rows(&_obj_pool, 1, 1);
+    auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+    IRuntimeFilter* runtime_filter =
+            create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter, build_expr_ctx, rows1);
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+    IRuntimeFilter* runtime_filter2 =
+            create_runtime_filter(TRuntimeFilterType::BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter2, build_expr_ctx, rows2);
+    ASSERT_TRUE(runtime_filter2->is_bloomfilter());
+
+    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+    ASSERT_TRUE(status.ok());
+    ASSERT_FALSE(runtime_filter->is_ignored());
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
+    ASSERT_FALSE(expr_context_list.empty());
+
+    for (TupleRow& row : *rows1) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    for (TupleRow& row : *rows2) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
+TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_bloom_filter_merge_in_test) {
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    TQueryOptions options;
+    options.runtime_filter_max_in_num = 3;
+
+    auto rows1 = create_rows(&_obj_pool, 1, 3);
+    auto rows2 = create_rows(&_obj_pool, 4, 4);
+
+    IRuntimeFilter* runtime_filter =
+            create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter, build_expr_ctx, rows1);
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+    runtime_filter->change_to_bloom_filter();
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+    IRuntimeFilter* runtime_filter2 =
+            create_runtime_filter(TRuntimeFilterType::IN, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter2, build_expr_ctx, rows2);
+    ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+    ASSERT_TRUE(status.ok());
+    ASSERT_FALSE(runtime_filter->is_ignored());
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
+    ASSERT_FALSE(expr_context_list.empty());
+
+    for (TupleRow& row : *rows1) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    for (TupleRow& row : *rows2) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
+TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_bloom_filter_merge_bloom_filter_test) {
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    TQueryOptions options;
+    options.runtime_filter_max_in_num = 3;
+
+    auto rows1 = create_rows(&_obj_pool, 1, 3);
+    auto rows2 = create_rows(&_obj_pool, 4, 6);
+
+    IRuntimeFilter* runtime_filter =
+            create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter, build_expr_ctx, rows1);
+    ASSERT_FALSE(runtime_filter->is_bloomfilter());
+    runtime_filter->change_to_bloom_filter();
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+    IRuntimeFilter* runtime_filter2 =
+            create_runtime_filter(TRuntimeFilterType::BLOOM, &options, 
_runtime_stat.get(), &_obj_pool);
+    insert(runtime_filter2, build_expr_ctx, rows2);
+    ASSERT_TRUE(runtime_filter2->is_bloomfilter());
+
+    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+    ASSERT_TRUE(status.ok());
+    ASSERT_FALSE(runtime_filter->is_ignored());
+    ASSERT_TRUE(runtime_filter->is_bloomfilter());
+//    
ASSERT_TRUE(runtime_filter->get_profile()->get_info_string("RealRuntimeFilterType")
 ==
+//                        
::doris::to_string(doris::RuntimeFilterType::BLOOM_FILTER);
+
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
+    ASSERT_FALSE(expr_context_list.empty());
+
+    for (TupleRow& row : *rows1) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    for (TupleRow& row : *rows2) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
 } // namespace doris
 
 int main(int argc, char** argv) {
diff --git a/docs/en/administrator-guide/runtime-filter.md 
b/docs/en/administrator-guide/runtime-filter.md
index 549cc9e..5db59cc 100644
--- a/docs/en/administrator-guide/runtime-filter.md
+++ b/docs/en/administrator-guide/runtime-filter.md
@@ -91,7 +91,7 @@ For query options related to Runtime Filter, please refer to 
the following secti
 
 - The first query option is to adjust the type of Runtime Filter used. In most 
cases, you only need to adjust this option, and keep the other options as 
default.
 
-  - `runtime_filter_type`: Including Bloom Filter, MinMax Filter and IN 
predicate. By default, only IN predicate will be used conservatively. In some 
cases, the performance will be higher when both Bloom Filter, MinMax Filter and 
IN predicate are used at the same time.
+  - `runtime_filter_type`: Including Bloom Filter, MinMax Filter, IN predicate 
and IN_OR_BLOOM Filter. By default, only IN_OR_BLOOM Filter will be used. In 
some cases, the performance will be higher when both Bloom Filter, MinMax 
Filter and IN predicate are used at the same time.
 
 - Other query options usually only need to be further adjusted in certain 
specific scenarios to achieve the best results. Usually only after performance 
testing, optimize for resource-intensive, long enough running time and high 
enough frequency queries.
 
@@ -114,7 +114,7 @@ The query options are further explained below.
 #### 1.runtime_filter_type
 Type of Runtime Filter used.
 
-**Type**: Number (1, 2, 4) or the corresponding mnemonic string (IN, 
BLOOM_FILTER, MIN_MAX), the default is 1 (IN predicate), use multiple commas to 
separate, pay attention to the need to add quotation marks , Or add any number 
of types, for example:
+**Type**: Number (1, 2, 4, 8) or the corresponding mnemonic string (IN, 
BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER), the default is 8 (IN_OR_BLOOM 
FILTER), use multiple commas to separate, pay attention to the need to add 
quotation marks , Or add any number of types, for example:
 ```
 set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
 ```
@@ -125,6 +125,8 @@ set runtime_filter_type=7;
 
 **Precautions for use**
 
+- **IN or Bloom Filter**: According to the actual number of rows in the right 
table during execution, the system automatically determines whether to use IN 
predicate or Bloom Filter.
+    - By default, IN Predicate will be used when the number of data rows in 
the right table is less than 1024 (which can be adjusted by ` 
runtime_filter_max_in_num 'in the session variable). Otherwise, use bloom 
filter.
 - **Bloom Filter**: There is a certain misjudgment rate, which results in the 
filtered data being a little less than expected, but it will not cause the 
final result to be inaccurate. In most cases, Bloom Filter can improve 
performance or has no significant impact on performance, but in some cases 
Under circumstances will cause performance degradation.
     - Bloom Filter construction and application overhead is high, so when the 
filtering rate is low, or the amount of data in the left table is small, Bloom 
Filter may cause performance degradation.
     - At present, only the Key column of the left table can be pushed down to 
the storage engine if the Bloom Filter is applied, and the test results show 
that the performance of the Bloom Filter is often reduced when the Bloom Filter 
is not pushed down to the storage engine.
diff --git a/docs/zh-CN/administrator-guide/runtime-filter.md 
b/docs/zh-CN/administrator-guide/runtime-filter.md
index 6da7d0e..2e6d897 100644
--- a/docs/zh-CN/administrator-guide/runtime-filter.md
+++ b/docs/zh-CN/administrator-guide/runtime-filter.md
@@ -91,7 +91,7 @@ Runtime Filter主要用于优化针对大表的join,如果左表的数据量
 
 - 第一个查询选项是调整使用的Runtime Filter类型,大多数情况下,您只需要调整这一个选项,其他选项保持默认即可。
 
-  - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN 
predicate,默认会保守的只使用IN predicate,部分情况下同时使用Bloom Filter、MinMax Filter、IN 
predicate时性能更高。
+  - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN 
predicate、IN_OR_BLOOM Filter,默认会使用IN_OR_BLOOM Filter,部分情况下同时使用Bloom 
Filter、MinMax Filter、IN predicate时性能更高。
 
 - 其他查询选项通常仅在某些特定场景下,才需进一步调整以达到最优效果。通常只在性能测试后,针对资源密集型、运行耗时足够长且频率足够高的查询进行优化。
 
@@ -114,7 +114,7 @@ Runtime Filter主要用于优化针对大表的join,如果左表的数据量
 #### 1.runtime_filter_type
 使用的Runtime Filter类型。
 
-**类型**: 数字(1, 2, 4)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX),默认1(IN 
predicate),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
+**类型**: 数字(1, 2, 4, 8)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX, 
IN_OR_BLOOM_FILTER),默认8(IN_OR_BLOOM 
Filter),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
 ```
 set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
 ```
@@ -125,6 +125,8 @@ set runtime_filter_type=7;
 
 **使用注意事项**
 
+- **IN or Bloom Filter**: 根据右表在执行过程中的真实行数,由系统自动判断使用 IN predicate 还是 Bloom 
Filter
+  - 默认在右表数据行数少于1024时会使用IN 
predicate(可通过session变量中的`runtime_filter_max_in_num`调整,否则使用Bloom filter。
 - **Bloom Filter**: 有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确,在大部分情况下Bloom 
Filter都可以提升性能或对性能没有显著影响,但在部分情况下会导致性能降低。
     - Bloom Filter构建和应用的开销较高,所以当过滤率较低时,或者左表数据量较少时,Bloom Filter可能会导致性能降低。
     - 目前只有左表的Key列应用Bloom Filter才能下推到存储引擎,而测试结果显示Bloom Filter不下推到存储引擎时往往会导致性能降低。
@@ -138,7 +140,6 @@ set runtime_filter_type=7;
     - 默认只有右表数据行数少于1024才会下推(可通过session变量中的`runtime_filter_max_in_num`调整)。
     - 目前IN predicate已实现合并方法。
     - 当同时指定In 
predicate和其他filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因是In 
predicate是精确的过滤条件,即使没有其他filter也可以高效过滤,如果同时使用则其他filter会做无用功。目前仅在Runtime 
filter的生产者和消费者处于同一个fragment时才会有去除非in filter的逻辑。
-- **
 
 #### 2.runtime_filter_mode
 用于控制Runtime Filter在instance之间传输的范围。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
index 8806263..12dea2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
@@ -41,7 +41,8 @@ public class RuntimeFilterTypeHelper {
     private static final Logger LOG = 
LogManager.getLogger(RuntimeFilterTypeHelper.class);
 
     public final static long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue() |
-            TRuntimeFilterType.BLOOM.getValue() | 
TRuntimeFilterType.MIN_MAX.getValue());
+            TRuntimeFilterType.BLOOM.getValue() | 
TRuntimeFilterType.MIN_MAX.getValue() |
+            TRuntimeFilterType.IN_OR_BLOOM.getValue());
 
     private final static Map<String, Long> varValueSet = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
@@ -49,6 +50,7 @@ public class RuntimeFilterTypeHelper {
         varValueSet.put("IN", (long) TRuntimeFilterType.IN.getValue());
         varValueSet.put("BLOOM_FILTER", (long) 
TRuntimeFilterType.BLOOM.getValue());
         varValueSet.put("MIN_MAX", (long) 
TRuntimeFilterType.MIN_MAX.getValue());
+        varValueSet.put("IN_OR_BLOOM_FILTER", (long) 
TRuntimeFilterType.IN_OR_BLOOM.getValue());
     }
 
     // convert long type variable value to string type that user can read
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c4bfdc2..c49134f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -385,9 +385,9 @@ public class SessionVariable implements Serializable, 
Writable {
     private int runtimeFilterWaitTimeMs = 1000;
     @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM)
     private int runtimeFiltersMaxNum = 10;
-    // Set runtimeFilterType to IN filter
+    // Set runtimeFilterType to IN_OR_BLOOM filter
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_TYPE)
-    private int runtimeFilterType = 1;
+    private int runtimeFilterType = 8;
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
     private int runtimeFilterMaxInNum = 1024;
     @VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 001fab6..63c5de5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1367,23 +1367,15 @@ public class QueryPlanTest {
 
         queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 
<=> t2.k1";
         Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterMode", "LOCAL");
-        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 7);
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 15);
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
         Assert.assertFalse(explainString.contains("runtime filter"));
 
         queryStr = "explain select * from jointest as a where k1 = (select 
count(1) from jointest as b where a.k1 = b.k1);";
         Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterMode", "GLOBAL");
-        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 7);
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 15);
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
         Assert.assertFalse(explainString.contains("runtime filter"));
-
-        // support merge in filter, and forbidden implicit conversion to bloom 
filter
-        queryStr = "explain select * from jointest t2 join [shuffle] jointest 
t1 where t1.k1 = t2.k1";
-        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterMode", "GLOBAL");
-        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", TRuntimeFilterType.IN.getValue());
-        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
-        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`"));
-        Assert.assertFalse(explainString.contains("runtime filters: 
RF000[bloom] -> `t2`.`k1`"));
     }
 
     @Test
@@ -1430,6 +1422,64 @@ public class QueryPlanTest {
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
         Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`"));
         Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 8);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[in_or_bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 9);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 10);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[bloom] <- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[bloom] -> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 11);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[in_or_bloom] -> `t2`.`k1`"));
+
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 12);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[min_max] <- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[min_max] -> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 13);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`, RF002[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`, RF002[in_or_bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 14);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[bloom] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`, RF002[in_or_bloom] <- 
`t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[bloom] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`, RF002[in_or_bloom] -> 
`t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", 15);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`, 
RF003[in_or_bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`, 
RF003[in_or_bloom] -> `t2`.`k1`"));
+
+        // support merge in filter, and forbidden implicit conversion to bloom 
filter
+        queryStr = "explain select * from jointest t2 join [shuffle] jointest 
t1 where t1.k1 = t2.k1";
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterMode", "GLOBAL");
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", TRuntimeFilterType.IN.getValue());
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
-> `t2`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] 
<- `t1`.`k1`"));
+        Assert.assertFalse(explainString.contains("runtime filters: 
RF000[bloom] -> `t2`.`k1`"));
+        Assert.assertFalse(explainString.contains("runtime filters: 
RF000[bloom] <- `t1`.`k1`"));
+
+        queryStr = "explain select * from jointest t2 join [shuffle] jointest 
t1 where t1.k1 = t2.k1";
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterMode", "GLOBAL");
+        Deencapsulation.setField(connectContext.getSessionVariable(), 
"runtimeFilterType", TRuntimeFilterType.IN_OR_BLOOM.getValue());
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[in_or_bloom] -> `t2`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: 
RF000[in_or_bloom] <- `t1`.`k1`"));
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
index e5c361f..e03063c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
@@ -148,23 +148,25 @@ public class RuntimeFilterGeneratorTest {
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
                 result = "GLOBAL";
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
-                result = 7;
+                result = 15;
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
-        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
-        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
-        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
-        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
-        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
         Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
                 , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
                         ", RF001[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
-                        ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+                        ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF003[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
         Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
                 , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
                         ", RF001[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
-                        ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+                        ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF003[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
 
         clearRuntimeFilterState();
         new Expectations() {
@@ -174,19 +176,21 @@ public class RuntimeFilterGeneratorTest {
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
-        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
-        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
-        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
-        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
-        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
         Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
                 , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
                         ", RF001[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
-                        ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+                        ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF003[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
         Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
                 , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
                         ", RF001[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
-                        ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+                        ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF003[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
 
         clearRuntimeFilterState();
         new Expectations() {
@@ -211,7 +215,7 @@ public class RuntimeFilterGeneratorTest {
         new Expectations() {
             {
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
-                result = 8;
+                result = 16;
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
@@ -350,6 +354,197 @@ public class RuntimeFilterGeneratorTest {
         Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
         Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
         Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 7;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 8;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 9;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 10;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 11;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF002[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF002[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 12;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 13;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF002[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF002[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 14;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF002[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF002[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 15;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+            , "RF000[in] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF001[bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF002[min_max] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                ", RF003[in_or_bloom] <- 
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+            , "RF000[in] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF001[bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF002[min_max] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                ", RF003[in_or_bloom] -> 
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+        
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
     }
 
     @Test(expected = IllegalStateException.class)
@@ -370,7 +565,7 @@ public class RuntimeFilterGeneratorTest {
         new Expectations() {
             {
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
-                result = 8;
+                result = 16;
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
index d0ce41c..17a8851 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
@@ -50,6 +50,30 @@ public class RuntimeFilterTypeHelperTest {
         runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX";
         Assert.assertEquals(new Long(7L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
 
+        runtimeFilterType = "IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(8L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(9L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "BLOOM_FILTER,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(10L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,BLOOM_FILTER,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(11L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "MIN_MAX,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(12L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,MIN_MAX,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(13L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "BLOOM_FILTER,MIN_MAX,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(14L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX,IN_OR_BLOOM_FILTER";
+        Assert.assertEquals(new Long(15L), 
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
         long runtimeFilterTypeValue = 0L;
         Assert.assertEquals("", 
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
 
@@ -61,6 +85,9 @@ public class RuntimeFilterTypeHelperTest {
 
         runtimeFilterTypeValue = 7L;
         Assert.assertEquals("BLOOM_FILTER,IN,MIN_MAX", 
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+
+        runtimeFilterTypeValue = 15L;
+        Assert.assertEquals("BLOOM_FILTER,IN,IN_OR_BLOOM_FILTER,MIN_MAX", 
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
     }
 
     @Test(expected = DdlException.class)
@@ -71,7 +98,7 @@ public class RuntimeFilterTypeHelperTest {
 
     @Test(expected = DdlException.class)
     public void testInvalidDecode() throws DdlException {
-        RuntimeFilterTypeHelper.decode(10L);
+        RuntimeFilterTypeHelper.decode(16L);
         Assert.fail("No exception throws");
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index fc8582f..074ab86 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -146,6 +146,11 @@ public class VariableMgrTest {
         executor.execute();
         Assert.assertEquals(2L, 
ctx.getSessionVariable().getRuntimeFilterType());
 
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
runtime_filter_type ='IN_OR_BLOOM_FILTER'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(8L, 
ctx.getSessionVariable().getRuntimeFilterType());
+
         // Get from name
         SysVariableDesc desc = new SysVariableDesc("exec_mem_limit");
         Assert.assertEquals(var.getMaxExecMemByte() + "", 
VariableMgr.getValue(var, desc));
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index b78a64e..d01a5fe 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -360,6 +360,7 @@ enum PFilterType {
     BLOOM_FILTER = 1;
     MINMAX_FILTER = 2;
     IN_FILTER = 3;
+    IN_OR_BLOOM_FILTER = 4;
 };
 
 message PMergeFilterRequest {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2ca3630..0fb74a5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -704,6 +704,7 @@ enum TRuntimeFilterType {
   IN = 1
   BLOOM = 2
   MIN_MAX = 4
+  IN_OR_BLOOM = 8
 }
 
 // Specification of a runtime filter.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to