This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c1fef37 [improvement](runtime-filter) Support adaptive runtime
filter(#7546) (#7645)
c1fef37 is described below
commit c1fef37399b2f7af051d8e3278e7911282c3f879
Author: 924060929 <[email protected]>
AuthorDate: Sun Jan 30 16:46:52 2022 +0800
[improvement](runtime-filter) Support adaptive runtime filter(#7546) (#7645)
Change 1: Support an adaptive runtime filter: IN_OR_BLOOM_FILTER
The processing logic is
If the number of rows in the right table < runtime_filter_max_in_num,
then IN predicate will work
If the number of rows in the right table >= runtime_filter_max_in_num,
then Bloom filter can take effect
Change 2: The default runtime filter is changed to filter:
IN_OR_BLOOM_FILTER
---
be/src/exprs/runtime_filter.cpp | 172 +++++++++++++-
be/src/exprs/runtime_filter.h | 12 +-
be/src/exprs/runtime_filter_slots.h | 42 +++-
be/test/exprs/runtime_filter_test.cpp | 263 ++++++++++++++++++++-
docs/en/administrator-guide/runtime-filter.md | 6 +-
docs/zh-CN/administrator-guide/runtime-filter.md | 7 +-
.../apache/doris/qe/RuntimeFilterTypeHelper.java | 4 +-
.../java/org/apache/doris/qe/SessionVariable.java | 4 +-
.../org/apache/doris/planner/QueryPlanTest.java | 70 +++++-
.../doris/planner/RuntimeFilterGeneratorTest.java | 229 ++++++++++++++++--
.../doris/qe/RuntimeFilterTypeHelperTest.java | 29 ++-
.../java/org/apache/doris/qe/VariableMgrTest.java | 5 +
gensrc/proto/internal_service.proto | 1 +
gensrc/thrift/PlanNodes.thrift | 1 +
14 files changed, 788 insertions(+), 57 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8e0e55e..ea0aa26 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -189,6 +189,8 @@ PFilterType get_type(RuntimeFilterType type) {
return PFilterType::BLOOM_FILTER;
case RuntimeFilterType::MINMAX_FILTER:
return PFilterType::MINMAX_FILTER;
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER:
+ return PFilterType::IN_OR_BLOOM_FILTER;
default:
return PFilterType::UNKNOW_FILTER;
}
@@ -342,6 +344,12 @@ public:
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
+ _is_bloomfilter = true;
+ _bloomfilter_func.reset(create_bloom_filter(_tracker,
_column_return_type));
+ return
_bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ }
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ _hybrid_set.reset(create_set(_column_return_type));
_bloomfilter_func.reset(create_bloom_filter(_tracker,
_column_return_type));
return
_bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
}
@@ -351,6 +359,22 @@ public:
return Status::OK();
}
+ void change_to_bloom_filter() {
+ CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
+ << "Can not change to bloom filter because of runtime filter
type is "
+ << to_string(_filter_type);
+ _is_bloomfilter = true;
+ if (_hybrid_set->size() > 0) {
+ auto it = _hybrid_set->begin();
+ while (it->has_next()) {
+ _bloomfilter_func->insert(it->get_value());
+ it->next();
+ }
+ // release in filter
+ _hybrid_set.reset(create_set(_column_return_type));
+ }
+ }
+
void insert(const void* data) {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
@@ -368,6 +392,14 @@ public:
_bloomfilter_func->insert(data);
break;
}
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ if (_is_bloomfilter) {
+ _bloomfilter_func->insert(data);
+ } else {
+ _hybrid_set->insert(data);
+ }
+ break;
+ }
default:
DCHECK(false);
break;
@@ -403,6 +435,16 @@ public:
}
}
+ RuntimeFilterType get_real_type() {
+ auto real_filter_type = _filter_type;
+ if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ real_filter_type = _is_bloomfilter
+ ? RuntimeFilterType::BLOOM_FILTER
+ : RuntimeFilterType::IN_FILTER;
+ }
+ return real_filter_type;
+ }
+
template <class T>
Status get_push_context(T* container, RuntimeState* state, ExprContext*
prob_expr) {
DCHECK(state != nullptr);
@@ -410,7 +452,8 @@ public:
DCHECK(_pool != nullptr);
DCHECK(prob_expr->root()->type().type == _column_return_type);
- switch (_filter_type) {
+ auto real_filter_type = get_real_type();
+ switch (real_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_is_ignored_in_filter) {
TTypeDesc type_desc = create_type_desc(_column_return_type);
@@ -468,16 +511,25 @@ public:
}
Status merge(const RuntimePredicateWrapper* wrapper) {
- DCHECK(_filter_type == wrapper->_filter_type);
- if (_filter_type != wrapper->_filter_type) {
- return Status::InvalidArgument("invalid filter type");
- }
+ bool can_not_merge_in_or_bloom
+ = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ (wrapper->_filter_type != RuntimeFilterType::IN_FILTER
+ && wrapper->_filter_type !=
RuntimeFilterType::BLOOM_FILTER);
+
+ bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER
+ && _filter_type != wrapper->_filter_type;
+
+ CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
+ << "fragment instance " << _fragment_instance_id.to_string()
+ << " can not merge runtime filter(id=" << _filter_id << "),
current is filter type is "
+ << to_string(_filter_type) << ", other filter type is " <<
to_string(wrapper->_filter_type);
+
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (_is_ignored_in_filter) {
break;
} else if (wrapper->_is_ignored_in_filter) {
- LOG(INFO) << "fragment instance " <<
_fragment_instance_id.to_string()
+ VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
<< " ignore merge runtime filter(in filter id "
<< _filter_id << ") because: " <<
*(wrapper->get_ignored_in_filter_msg());
@@ -490,17 +542,20 @@ public:
// try insert set
_hybrid_set->insert(wrapper->_hybrid_set.get());
if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
+#ifdef VLOG_DEBUG_IS_ON
std::stringstream msg;
msg << "fragment instance " <<
_fragment_instance_id.to_string()
<< " ignore merge runtime filter(in filter id "
<< _filter_id << ") because: in_num(" <<
_hybrid_set->size()
<< ") >= max_in_num(" << _max_in_num << ")";
_ignored_in_filter_msg = _pool->add(new
std::string(msg.str()));
+#else
+ _ignored_in_filter_msg = _pool->add(new
std::string("ignored"));
+#endif
_is_ignored_in_filter = true;
// release in filter
_hybrid_set.reset(create_set(_column_return_type));
- LOG(INFO) << msg.str();
}
break;
}
@@ -512,6 +567,53 @@ public:
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
break;
}
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ auto real_filter_type = _is_bloomfilter
+ ? RuntimeFilterType::BLOOM_FILTER
+ : RuntimeFilterType::IN_FILTER;
+ if (real_filter_type == RuntimeFilterType::IN_FILTER) {
+ if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// in merge in
+ CHECK(!wrapper->_is_ignored_in_filter)
+ << "fragment instance " <<
_fragment_instance_id.to_string()
+ << " can not ignore merge runtime filter(in filter
id "
+ << wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
+ << *(wrapper->get_ignored_in_filter_msg());
+ _hybrid_set->insert(wrapper->_hybrid_set.get());
+ if (_max_in_num >= 0 && _hybrid_set->size() >=
_max_in_num) {
+ VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
+ << " change runtime filter to bloom filter(id=" <<
_filter_id
+ << ") because: in_num(" << _hybrid_set->size()
+ << ") >= max_in_num(" << _max_in_num << ")";
+ change_to_bloom_filter();
+ }
+ // in merge bloom filter
+ } else {
+ VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
+ << " change runtime filter to bloom filter(id=" <<
_filter_id
+ << ") because: already exist a bloom filter";
+ change_to_bloom_filter();
+ _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+ }
+ } else {
+ if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// bloom filter merge in
+ CHECK(!wrapper->_is_ignored_in_filter)
+ << "fragment instance " <<
_fragment_instance_id.to_string()
+ << " can not ignore merge runtime filter(in filter
id "
+ << wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
+ << *(wrapper->get_ignored_in_filter_msg());
+ auto it = wrapper->_hybrid_set->begin();
+ while (it->has_next()) {
+ auto value = it->get_value();
+ _bloomfilter_func->insert(value);
+ it->next();
+ }
+ // bloom filter merge bloom filter
+ } else {
+ _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+ }
+ }
+ break;
+ }
default:
DCHECK(false);
return Status::InternalError("unknown runtime filter");
@@ -524,7 +626,7 @@ public:
PrimitiveType type = to_primitive_type(in_filter->column_type());
if (in_filter->has_ignored_msg()) {
- LOG(INFO) << "Ignore in filter because: " <<
in_filter->ignored_msg();
+ VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because:
" << in_filter->ignored_msg();
_is_ignored_in_filter = true;
_ignored_in_filter_msg = _pool->add(new
std::string(in_filter->ignored_msg()));
return Status::OK();
@@ -625,6 +727,7 @@ public:
// assign this filter by protobuf
Status assign(const PBloomFilter* bloom_filter, const char* data) {
DCHECK(_tracker != nullptr);
+ _is_bloomfilter = true;
// we won't use this class to insert or find any data
// so any type is ok
_bloomfilter_func.reset(create_bloom_filter(_tracker,
PrimitiveType::TYPE_INT));
@@ -766,6 +869,10 @@ public:
}
}
+ bool is_bloomfilter() const {
+ return _is_bloomfilter;
+ }
+
bool is_ignored_in_filter() const {
return _is_ignored_in_filter;
}
@@ -791,6 +898,7 @@ private:
std::unique_ptr<MinMaxFuncBase> _minmax_func;
std::unique_ptr<HybridSetBase> _hybrid_set;
std::unique_ptr<IBloomFilterFuncBase> _bloomfilter_func;
+ bool _is_bloomfilter = false;
bool _is_ignored_in_filter = false;
std::string *_ignored_in_filter_msg;
UniqueId _fragment_instance_id;
@@ -828,6 +936,7 @@ Status IRuntimeFilter::publish() {
DCHECK(status.ok());
// push down
std::swap(this->_wrapper, consumer_filter->_wrapper);
+ consumer_filter->update_runtime_filter_type_to_profile();
consumer_filter->signal();
return Status::OK();
} else {
@@ -904,6 +1013,8 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
} else if (desc->type == TRuntimeFilterType::IN) {
_runtime_filter_type = RuntimeFilterType::IN_FILTER;
+ } else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
+ _runtime_filter_type = RuntimeFilterType::IN_OR_BLOOM_FILTER;
} else {
return Status::InvalidArgument("unknown filter type");
}
@@ -961,6 +1072,14 @@ Status IRuntimeFilter::create_wrapper(const
UpdateRuntimeFilterParams* param, Me
return _create_wrapper(param, tracker, pool, wrapper);
}
+void IRuntimeFilter::change_to_bloom_filter() {
+ auto origin_type = _wrapper->get_real_type();
+ _wrapper->change_to_bloom_filter();
+ if (origin_type != _wrapper->get_real_type()) {
+ update_runtime_filter_type_to_profile();
+ }
+}
+
template <class T>
Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
@@ -995,6 +1114,16 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
_await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
_effect_timer.reset(new
ScopedTimer<MonotonicStopWatch>(_effect_time_cost));
_effect_timer->start();
+
+ if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ update_runtime_filter_type_to_profile();
+ }
+}
+
+void IRuntimeFilter::update_runtime_filter_type_to_profile() {
+ if (_profile.get() != nullptr) {
+ _profile->add_info_string("RealRuntimeFilterType",
::doris::to_string(_wrapper->get_real_type()));
+ }
}
void IRuntimeFilter::set_push_down_profile() {
@@ -1010,11 +1139,15 @@ Status IRuntimeFilter::merge_from(const
RuntimePredicateWrapper* wrapper) {
set_ignored();
set_ignored_msg(*(wrapper->get_ignored_in_filter_msg()));
}
+ auto origin_type = _wrapper->get_real_type();
Status status = _wrapper->merge(wrapper);
if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
set_ignored();
set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg()));
}
+ if (origin_type != _wrapper->get_real_type()) {
+ update_runtime_filter_type_to_profile();
+ }
return status;
}
@@ -1035,17 +1168,24 @@ void batch_copy(PInFilter* filter,
HybridSetBase::IteratorBase* it,
template <class T>
Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) {
- request->set_filter_type(get_type(_runtime_filter_type));
+ auto real_runtime_filter_type = _runtime_filter_type;
+ if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ real_runtime_filter_type = _wrapper->is_bloomfilter()
+ ? RuntimeFilterType::BLOOM_FILTER
+ : RuntimeFilterType::IN_FILTER;
+ }
- if (_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
+ request->set_filter_type(get_type(real_runtime_filter_type));
+
+ if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
auto in_filter = request->mutable_in_filter();
to_protobuf(in_filter);
- } else if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
+ } else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len));
DCHECK(data != nullptr);
request->mutable_bloom_filter()->set_filter_length(*len);
request->mutable_bloom_filter()->set_always_true(false);
- } else if (_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+ } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
auto minmax_filter = request->mutable_minmax_filter();
to_protobuf(minmax_filter);
} else {
@@ -1231,6 +1371,10 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
}
}
+bool IRuntimeFilter::is_bloomfilter() {
+ return _wrapper->is_bloomfilter();
+}
+
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
if (param->request->has_in_filter() &&
param->request->in_filter().has_ignored_msg()) {
set_ignored();
@@ -1240,7 +1384,11 @@ Status IRuntimeFilter::update_filter(const
UpdateRuntimeFilterParams* param) {
}
std::unique_ptr<RuntimePredicateWrapper> wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _mem_tracker, _pool,
&wrapper));
+ auto origin_type = _wrapper->get_real_type();
RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
+ if (origin_type != _wrapper->get_real_type()) {
+ update_runtime_filter_type_to_profile();
+ }
this->signal();
return Status::OK();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 60a3c68..9df757f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -50,7 +50,8 @@ enum class RuntimeFilterType {
UNKNOWN_FILTER = -1,
IN_FILTER = 0,
MINMAX_FILTER = 1,
- BLOOM_FILTER = 2
+ BLOOM_FILTER = 2,
+ IN_OR_BLOOM_FILTER = 3
};
inline std::string to_string(RuntimeFilterType type) {
@@ -64,6 +65,9 @@ inline std::string to_string(RuntimeFilterType type) {
case RuntimeFilterType::MINMAX_FILTER: {
return std::string("minmax");
}
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ return std::string("in_or_bloomfilter");
+ }
default:
return std::string("UNKNOWN");
}
@@ -193,6 +197,7 @@ public:
static Status create_wrapper(const UpdateRuntimeFilterParams* param,
MemTracker* tracker,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
+ void change_to_bloom_filter();
Status update_filter(const UpdateRuntimeFilterParams* param);
void set_ignored() { _is_ignored = true; }
@@ -202,6 +207,9 @@ public:
void set_ignored_msg(std::string &msg) { _ignored_msg = msg; }
+ // for ut
+ bool is_bloomfilter();
+
// consumer should call before released
Status consumer_close();
@@ -211,6 +219,8 @@ public:
void init_profile(RuntimeProfile* parent_profile);
+ void update_runtime_filter_type_to_profile();
+
void set_push_down_profile();
void ready_for_publish();
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index e098adc..5326df4 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -60,7 +60,27 @@ public:
runtime_filter->publish_finally();
};
- for (auto& filter_desc : _runtime_filter_descs) {
+ // ordered vector: IN, IN_OR_BLOOM, others.
+ // so we can ignore other filter if IN Predicate exists.
+ std::vector<TRuntimeFilterDesc>
sorted_runtime_filter_descs(_runtime_filter_descs);
+ auto compare_desc = [](TRuntimeFilterDesc& d1, TRuntimeFilterDesc& d2)
{
+ if (d1.type == d2.type) {
+ return false;
+ } else if (d1.type == TRuntimeFilterType::IN) {
+ return true;
+ } else if (d2.type == TRuntimeFilterType::IN) {
+ return false;
+ } else if (d1.type == TRuntimeFilterType::IN_OR_BLOOM) {
+ return true;
+ } else if (d2.type == TRuntimeFilterType::IN_OR_BLOOM) {
+ return false;
+ } else {
+ return d1.type < d2.type;
+ }
+ };
+ std::sort(sorted_runtime_filter_descs.begin(),
sorted_runtime_filter_descs.end(), compare_desc);
+
+ for (auto& filter_desc : sorted_runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
&runtime_filter));
@@ -74,6 +94,10 @@ public:
bool is_in_filter = (runtime_filter->type() ==
RuntimeFilterType::IN_FILTER);
+ if (over_max_in_num && runtime_filter->type() ==
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ runtime_filter->change_to_bloom_filter();
+ }
+
// Note:
// In the case that exist *remote target* and in filter and other
filter,
// we must merge other filter whatever in filter is over the max
num in current node,
@@ -85,7 +109,7 @@ public:
if (!runtime_filter->has_remote_target()) {
bool exists_in_filter =
has_in_filter[runtime_filter->expr_order()];
if (is_in_filter && over_max_in_num) {
- LOG(INFO) << "fragment instance " <<
print_id(state->fragment_instance_id())
+ VLOG_DEBUG << "fragment instance " <<
print_id(state->fragment_instance_id())
<< " ignore runtime filter(in filter id " <<
filter_desc.filter_id
<< ") because: in_num(" << hash_table_size
<< ") >= max_in_num(" << max_in_num << ")";
@@ -94,7 +118,7 @@ public:
} else if (!is_in_filter && exists_in_filter) {
// do not create 'bloom filter' and 'minmax filter' when
'in filter' has created
// because in filter is exactly filter, so it is enough to
filter data
- LOG(INFO) << "fragment instance " <<
print_id(state->fragment_instance_id())
+ VLOG_DEBUG << "fragment instance " <<
print_id(state->fragment_instance_id())
<< " ignore runtime filter(" <<
to_string(runtime_filter->type())
<< " id " << filter_desc.filter_id
<< ") because: already exists in filter";
@@ -102,14 +126,20 @@ public:
continue;
}
} else if (is_in_filter && over_max_in_num) {
+#ifdef VLOG_DEBUG_IS_ON
std::string msg = fmt::format("fragment instance {} ignore
runtime filter(in filter id {}) because: in_num({}) >= max_in_num({})",
- print_id(state->fragment_instance_id()),
filter_desc.filter_id, hash_table_size, max_in_num);
+
print_id(state->fragment_instance_id()), filter_desc.filter_id,
hash_table_size, max_in_num);
ignore_remote_filter(runtime_filter, msg);
+#else
+ ignore_remote_filter(runtime_filter, "ignored");
+#endif
continue;
}
- has_in_filter[runtime_filter->expr_order()] =
- (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+ if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER)
+ || (runtime_filter->type() ==
RuntimeFilterType::IN_OR_BLOOM_FILTER && !over_max_in_num)) {
+ has_in_filter[runtime_filter->expr_order()] = true;
+ }
_runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
}
diff --git a/be/test/exprs/runtime_filter_test.cpp
b/be/test/exprs/runtime_filter_test.cpp
index 68cefab..6ca422f 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -174,7 +174,6 @@ TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
}
TEST_F(RuntimeFilterTest, runtime_filter_merge_in_filter_test) {
- // size_t prob_index = 0;
SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
@@ -223,7 +222,6 @@ TEST_F(RuntimeFilterTest,
runtime_filter_merge_in_filter_test) {
}
TEST_F(RuntimeFilterTest, runtime_filter_ignore_in_filter_test) {
- // size_t prob_index = 0;
SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
@@ -271,6 +269,267 @@ TEST_F(RuntimeFilterTest,
runtime_filter_ignore_in_filter_test) {
}
}
+TEST_F(RuntimeFilterTest, runtime_filter_in_or_bloom_filter_in_merge_in_test) {
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ TQueryOptions options;
+ options.runtime_filter_max_in_num = 3;
+
+ auto rows1 = create_rows(&_obj_pool, 1, 1);
+ auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+ IRuntimeFilter* runtime_filter =
+ create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter, build_expr_ctx, rows1);
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+ IRuntimeFilter* runtime_filter2 =
+ create_runtime_filter(TRuntimeFilterType::IN, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter2, build_expr_ctx, rows2);
+ ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+ Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+ ASSERT_TRUE(status.ok());
+ ASSERT_FALSE(runtime_filter->is_ignored());
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
+ ASSERT_FALSE(expr_context_list.empty());
+
+ for (TupleRow& row : *rows1) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ for (TupleRow& row : *rows2) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
+TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_in_merge_in_upgrade_test) {
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ TQueryOptions options;
+ options.runtime_filter_max_in_num = 2;
+
+ auto rows1 = create_rows(&_obj_pool, 1, 1);
+ auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+ IRuntimeFilter* runtime_filter =
+ create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter, build_expr_ctx, rows1);
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+ IRuntimeFilter* runtime_filter2 =
+ create_runtime_filter(TRuntimeFilterType::IN, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter2, build_expr_ctx, rows2);
+ ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+ Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+ ASSERT_TRUE(status.ok());
+ ASSERT_FALSE(runtime_filter->is_ignored());
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
+ ASSERT_FALSE(expr_context_list.empty());
+
+ for (TupleRow& row : *rows1) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ for (TupleRow& row : *rows2) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
+TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_in_merge_bloom_filter_upgrade_test) {
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ TQueryOptions options;
+ options.runtime_filter_max_in_num = 100;
+
+ auto rows1 = create_rows(&_obj_pool, 1, 1);
+ auto rows2 = create_rows(&_obj_pool, 2, 2);
+
+ IRuntimeFilter* runtime_filter =
+ create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter, build_expr_ctx, rows1);
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+
+ IRuntimeFilter* runtime_filter2 =
+ create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter2, build_expr_ctx, rows2);
+ ASSERT_TRUE(runtime_filter2->is_bloomfilter());
+
+ Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+ ASSERT_TRUE(status.ok());
+ ASSERT_FALSE(runtime_filter->is_ignored());
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
+ ASSERT_FALSE(expr_context_list.empty());
+
+ for (TupleRow& row : *rows1) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ for (TupleRow& row : *rows2) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
+TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_bloom_filter_merge_in_test) {
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ TQueryOptions options;
+ options.runtime_filter_max_in_num = 3;
+
+ auto rows1 = create_rows(&_obj_pool, 1, 3);
+ auto rows2 = create_rows(&_obj_pool, 4, 4);
+
+ IRuntimeFilter* runtime_filter =
+ create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter, build_expr_ctx, rows1);
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+ runtime_filter->change_to_bloom_filter();
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+ IRuntimeFilter* runtime_filter2 =
+ create_runtime_filter(TRuntimeFilterType::IN, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter2, build_expr_ctx, rows2);
+ ASSERT_FALSE(runtime_filter2->is_bloomfilter());
+
+ Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+ ASSERT_TRUE(status.ok());
+ ASSERT_FALSE(runtime_filter->is_ignored());
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
+ ASSERT_FALSE(expr_context_list.empty());
+
+ for (TupleRow& row : *rows1) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ for (TupleRow& row : *rows2) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
+TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_bloom_filter_merge_bloom_filter_test) {
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ TQueryOptions options;
+ options.runtime_filter_max_in_num = 3;
+
+ auto rows1 = create_rows(&_obj_pool, 1, 3);
+ auto rows2 = create_rows(&_obj_pool, 4, 6);
+
+ IRuntimeFilter* runtime_filter =
+ create_runtime_filter(TRuntimeFilterType::IN_OR_BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter, build_expr_ctx, rows1);
+ ASSERT_FALSE(runtime_filter->is_bloomfilter());
+ runtime_filter->change_to_bloom_filter();
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+
+ IRuntimeFilter* runtime_filter2 =
+ create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
_runtime_stat.get(), &_obj_pool);
+ insert(runtime_filter2, build_expr_ctx, rows2);
+ ASSERT_TRUE(runtime_filter2->is_bloomfilter());
+
+ Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
+ ASSERT_TRUE(status.ok());
+ ASSERT_FALSE(runtime_filter->is_ignored());
+ ASSERT_TRUE(runtime_filter->is_bloomfilter());
+//
ASSERT_TRUE(runtime_filter->get_profile()->get_info_string("RealRuntimeFilterType")
==
+//
::doris::to_string(doris::RuntimeFilterType::BLOOM_FILTER);
+
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
+ ASSERT_FALSE(expr_context_list.empty());
+
+ for (TupleRow& row : *rows1) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ for (TupleRow& row : *rows2) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
} // namespace doris
int main(int argc, char** argv) {
diff --git a/docs/en/administrator-guide/runtime-filter.md
b/docs/en/administrator-guide/runtime-filter.md
index 549cc9e..5db59cc 100644
--- a/docs/en/administrator-guide/runtime-filter.md
+++ b/docs/en/administrator-guide/runtime-filter.md
@@ -91,7 +91,7 @@ For query options related to Runtime Filter, please refer to
the following secti
- The first query option is to adjust the type of Runtime Filter used. In most
cases, you only need to adjust this option, and keep the other options as
default.
- - `runtime_filter_type`: Including Bloom Filter, MinMax Filter and IN
predicate. By default, only IN predicate will be used conservatively. In some
cases, the performance will be higher when both Bloom Filter, MinMax Filter and
IN predicate are used at the same time.
+ - `runtime_filter_type`: Including Bloom Filter, MinMax Filter, IN predicate
and IN_OR_BLOOM Filter. By default, only IN_OR_BLOOM Filter will be used. In
some cases, the performance will be higher when both Bloom Filter, MinMax
Filter and IN predicate are used at the same time.
- Other query options usually only need to be further adjusted in certain
specific scenarios to achieve the best results. Usually only after performance
testing, optimize for resource-intensive, long enough running time and high
enough frequency queries.
@@ -114,7 +114,7 @@ The query options are further explained below.
#### 1.runtime_filter_type
Type of Runtime Filter used.
-**Type**: Number (1, 2, 4) or the corresponding mnemonic string (IN,
BLOOM_FILTER, MIN_MAX), the default is 1 (IN predicate), use multiple commas to
separate, pay attention to the need to add quotation marks , Or add any number
of types, for example:
+**Type**: Number (1, 2, 4, 8) or the corresponding mnemonic string (IN,
BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER), the default is 8 (IN_OR_BLOOM
FILTER), use multiple commas to separate, pay attention to the need to add
quotation marks , Or add any number of types, for example:
```
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
```
@@ -125,6 +125,8 @@ set runtime_filter_type=7;
**Precautions for use**
+- **IN or Bloom Filter**: According to the actual number of rows in the right
table during execution, the system automatically determines whether to use IN
predicate or Bloom Filter.
+ - By default, IN Predicate will be used when the number of data rows in
the right table is less than 1024 (which can be adjusted by `
runtime_filter_max_in_num 'in the session variable). Otherwise, use bloom
filter.
- **Bloom Filter**: There is a certain misjudgment rate, which results in the
filtered data being a little less than expected, but it will not cause the
final result to be inaccurate. In most cases, Bloom Filter can improve
performance or has no significant impact on performance, but in some cases
Under circumstances will cause performance degradation.
- Bloom Filter construction and application overhead is high, so when the
filtering rate is low, or the amount of data in the left table is small, Bloom
Filter may cause performance degradation.
- At present, only the Key column of the left table can be pushed down to
the storage engine if the Bloom Filter is applied, and the test results show
that the performance of the Bloom Filter is often reduced when the Bloom Filter
is not pushed down to the storage engine.
diff --git a/docs/zh-CN/administrator-guide/runtime-filter.md
b/docs/zh-CN/administrator-guide/runtime-filter.md
index 6da7d0e..2e6d897 100644
--- a/docs/zh-CN/administrator-guide/runtime-filter.md
+++ b/docs/zh-CN/administrator-guide/runtime-filter.md
@@ -91,7 +91,7 @@ Runtime Filter主要用于优化针对大表的join,如果左表的数据量
- 第一个查询选项是调整使用的Runtime Filter类型,大多数情况下,您只需要调整这一个选项,其他选项保持默认即可。
- - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN
predicate,默认会保守的只使用IN predicate,部分情况下同时使用Bloom Filter、MinMax Filter、IN
predicate时性能更高。
+ - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN
predicate、IN_OR_BLOOM Filter,默认会使用IN_OR_BLOOM Filter,部分情况下同时使用Bloom
Filter、MinMax Filter、IN predicate时性能更高。
- 其他查询选项通常仅在某些特定场景下,才需进一步调整以达到最优效果。通常只在性能测试后,针对资源密集型、运行耗时足够长且频率足够高的查询进行优化。
@@ -114,7 +114,7 @@ Runtime Filter主要用于优化针对大表的join,如果左表的数据量
#### 1.runtime_filter_type
使用的Runtime Filter类型。
-**类型**: 数字(1, 2, 4)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX),默认1(IN
predicate),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
+**类型**: 数字(1, 2, 4, 8)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX,
IN_OR_BLOOM_FILTER),默认8(IN_OR_BLOOM
Filter),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
```
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
```
@@ -125,6 +125,8 @@ set runtime_filter_type=7;
**使用注意事项**
+- **IN or Bloom Filter**: 根据右表在执行过程中的真实行数,由系统自动判断使用 IN predicate 还是 Bloom
Filter
+ - 默认在右表数据行数少于1024时会使用IN
predicate(可通过session变量中的`runtime_filter_max_in_num`调整,否则使用Bloom filter。
- **Bloom Filter**: 有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确,在大部分情况下Bloom
Filter都可以提升性能或对性能没有显著影响,但在部分情况下会导致性能降低。
- Bloom Filter构建和应用的开销较高,所以当过滤率较低时,或者左表数据量较少时,Bloom Filter可能会导致性能降低。
- 目前只有左表的Key列应用Bloom Filter才能下推到存储引擎,而测试结果显示Bloom Filter不下推到存储引擎时往往会导致性能降低。
@@ -138,7 +140,6 @@ set runtime_filter_type=7;
- 默认只有右表数据行数少于1024才会下推(可通过session变量中的`runtime_filter_max_in_num`调整)。
- 目前IN predicate已实现合并方法。
- 当同时指定In
predicate和其他filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因是In
predicate是精确的过滤条件,即使没有其他filter也可以高效过滤,如果同时使用则其他filter会做无用功。目前仅在Runtime
filter的生产者和消费者处于同一个fragment时才会有去除非in filter的逻辑。
-- **
#### 2.runtime_filter_mode
用于控制Runtime Filter在instance之间传输的范围。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
index 8806263..12dea2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
@@ -41,7 +41,8 @@ public class RuntimeFilterTypeHelper {
private static final Logger LOG =
LogManager.getLogger(RuntimeFilterTypeHelper.class);
public final static long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue() |
- TRuntimeFilterType.BLOOM.getValue() |
TRuntimeFilterType.MIN_MAX.getValue());
+ TRuntimeFilterType.BLOOM.getValue() |
TRuntimeFilterType.MIN_MAX.getValue() |
+ TRuntimeFilterType.IN_OR_BLOOM.getValue());
private final static Map<String, Long> varValueSet =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
@@ -49,6 +50,7 @@ public class RuntimeFilterTypeHelper {
varValueSet.put("IN", (long) TRuntimeFilterType.IN.getValue());
varValueSet.put("BLOOM_FILTER", (long)
TRuntimeFilterType.BLOOM.getValue());
varValueSet.put("MIN_MAX", (long)
TRuntimeFilterType.MIN_MAX.getValue());
+ varValueSet.put("IN_OR_BLOOM_FILTER", (long)
TRuntimeFilterType.IN_OR_BLOOM.getValue());
}
// convert long type variable value to string type that user can read
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c4bfdc2..c49134f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -385,9 +385,9 @@ public class SessionVariable implements Serializable,
Writable {
private int runtimeFilterWaitTimeMs = 1000;
@VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM)
private int runtimeFiltersMaxNum = 10;
- // Set runtimeFilterType to IN filter
+ // Set runtimeFilterType to IN_OR_BLOOM filter
@VariableMgr.VarAttr(name = RUNTIME_FILTER_TYPE)
- private int runtimeFilterType = 1;
+ private int runtimeFilterType = 8;
@VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
private int runtimeFilterMaxInNum = 1024;
@VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 001fab6..63c5de5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1367,23 +1367,15 @@ public class QueryPlanTest {
queryStr = "explain select * from jointest t2, jointest t1 where t1.k1
<=> t2.k1";
Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterMode", "LOCAL");
- Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 7);
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 15);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
Assert.assertFalse(explainString.contains("runtime filter"));
queryStr = "explain select * from jointest as a where k1 = (select
count(1) from jointest as b where a.k1 = b.k1);";
Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterMode", "GLOBAL");
- Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 7);
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 15);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
Assert.assertFalse(explainString.contains("runtime filter"));
-
- // support merge in filter, and forbidden implicit conversion to bloom
filter
- queryStr = "explain select * from jointest t2 join [shuffle] jointest
t1 where t1.k1 = t2.k1";
- Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterMode", "GLOBAL");
- Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", TRuntimeFilterType.IN.getValue());
- explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
- Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`"));
- Assert.assertFalse(explainString.contains("runtime filters:
RF000[bloom] -> `t2`.`k1`"));
}
@Test
@@ -1430,6 +1422,64 @@ public class QueryPlanTest {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`"));
Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 8);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[in_or_bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 9);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 10);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[bloom] <- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[bloom] -> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 11);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[in_or_bloom] -> `t2`.`k1`"));
+
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 12);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[min_max] <- `t1`.`k1`, RF001[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[min_max] -> `t2`.`k1`, RF001[in_or_bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 13);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`, RF002[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`, RF002[in_or_bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 14);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[bloom] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`, RF002[in_or_bloom] <-
`t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[bloom] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`, RF002[in_or_bloom] ->
`t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", 15);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`,
RF003[in_or_bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`,
RF003[in_or_bloom] -> `t2`.`k1`"));
+
+ // support merge in filter, and forbidden implicit conversion to bloom
filter
+ queryStr = "explain select * from jointest t2 join [shuffle] jointest
t1 where t1.k1 = t2.k1";
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterMode", "GLOBAL");
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", TRuntimeFilterType.IN.getValue());
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
-> `t2`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in]
<- `t1`.`k1`"));
+ Assert.assertFalse(explainString.contains("runtime filters:
RF000[bloom] -> `t2`.`k1`"));
+ Assert.assertFalse(explainString.contains("runtime filters:
RF000[bloom] <- `t1`.`k1`"));
+
+ queryStr = "explain select * from jointest t2 join [shuffle] jointest
t1 where t1.k1 = t2.k1";
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterMode", "GLOBAL");
+ Deencapsulation.setField(connectContext.getSessionVariable(),
"runtimeFilterType", TRuntimeFilterType.IN_OR_BLOOM.getValue());
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[in_or_bloom] -> `t2`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters:
RF000[in_or_bloom] <- `t1`.`k1`"));
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
index e5c361f..e03063c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
@@ -148,23 +148,25 @@ public class RuntimeFilterGeneratorTest {
ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
result = "GLOBAL";
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
- result = 7;
+ result = 15;
}
};
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
-
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
-
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
- Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
- Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
- Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
, "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
", RF001[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
- ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF003[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
, "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
", RF001[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
- ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF003[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
clearRuntimeFilterState();
new Expectations() {
@@ -174,19 +176,21 @@ public class RuntimeFilterGeneratorTest {
}
};
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
-
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
-
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
- Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
- Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
- Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
, "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
", RF001[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
- ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF003[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
, "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
", RF001[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
- ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF003[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
clearRuntimeFilterState();
new Expectations() {
@@ -211,7 +215,7 @@ public class RuntimeFilterGeneratorTest {
new Expectations() {
{
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
- result = 8;
+ result = 16;
}
};
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
@@ -350,6 +354,197 @@ public class RuntimeFilterGeneratorTest {
Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 7;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 8;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 9;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 10;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 11;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 12;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 13;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 14;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 15;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[min_max] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF003[in_or_bloom] <-
`default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[min_max] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF003[in_or_bloom] ->
`default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 4);
+
Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 4);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 4);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 4);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 4);
}
@Test(expected = IllegalStateException.class)
@@ -370,7 +565,7 @@ public class RuntimeFilterGeneratorTest {
new Expectations() {
{
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
- result = 8;
+ result = 16;
}
};
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
index d0ce41c..17a8851 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
@@ -50,6 +50,30 @@ public class RuntimeFilterTypeHelperTest {
runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX";
Assert.assertEquals(new Long(7L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+ runtimeFilterType = "IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(8L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(9L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "BLOOM_FILTER,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(10L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,BLOOM_FILTER,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(11L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "MIN_MAX,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(12L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,MIN_MAX,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(13L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "BLOOM_FILTER,MIN_MAX,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(14L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX,IN_OR_BLOOM_FILTER";
+ Assert.assertEquals(new Long(15L),
RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
long runtimeFilterTypeValue = 0L;
Assert.assertEquals("",
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
@@ -61,6 +85,9 @@ public class RuntimeFilterTypeHelperTest {
runtimeFilterTypeValue = 7L;
Assert.assertEquals("BLOOM_FILTER,IN,MIN_MAX",
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+
+ runtimeFilterTypeValue = 15L;
+ Assert.assertEquals("BLOOM_FILTER,IN,IN_OR_BLOOM_FILTER,MIN_MAX",
RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
}
@Test(expected = DdlException.class)
@@ -71,7 +98,7 @@ public class RuntimeFilterTypeHelperTest {
@Test(expected = DdlException.class)
public void testInvalidDecode() throws DdlException {
- RuntimeFilterTypeHelper.decode(10L);
+ RuntimeFilterTypeHelper.decode(16L);
Assert.fail("No exception throws");
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index fc8582f..074ab86 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -146,6 +146,11 @@ public class VariableMgrTest {
executor.execute();
Assert.assertEquals(2L,
ctx.getSessionVariable().getRuntimeFilterType());
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
runtime_filter_type ='IN_OR_BLOOM_FILTER'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(8L,
ctx.getSessionVariable().getRuntimeFilterType());
+
// Get from name
SysVariableDesc desc = new SysVariableDesc("exec_mem_limit");
Assert.assertEquals(var.getMaxExecMemByte() + "",
VariableMgr.getValue(var, desc));
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index b78a64e..d01a5fe 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -360,6 +360,7 @@ enum PFilterType {
BLOOM_FILTER = 1;
MINMAX_FILTER = 2;
IN_FILTER = 3;
+ IN_OR_BLOOM_FILTER = 4;
};
message PMergeFilterRequest {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2ca3630..0fb74a5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -704,6 +704,7 @@ enum TRuntimeFilterType {
IN = 1
BLOOM = 2
MIN_MAX = 4
+ IN_OR_BLOOM = 8
}
// Specification of a runtime filter.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]