This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 95c91fab2e [refactor](vec) delete non-vec runtime filter (#16016)
95c91fab2e is described below
commit 95c91fab2e6d25634e2b3915b0bfad583964bc3a
Author: Gabriel <[email protected]>
AuthorDate: Wed Jan 18 17:49:20 2023 +0800
[refactor](vec) delete non-vec runtime filter (#16016)
* [refactor](vec) delete non-vec runtime filter
* update
---
be/src/exprs/runtime_filter.cpp | 176 ++---------
be/src/exprs/runtime_filter.h | 27 --
be/src/exprs/runtime_filter_slots.h | 1 -
be/test/CMakeLists.txt | 1 -
be/test/exprs/runtime_filter_test.cpp | 538 ----------------------------------
5 files changed, 26 insertions(+), 717 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index d5a97d766a..b2dc2351a6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -25,10 +25,7 @@
#include "exprs/bitmapfilter_predicate.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/create_predicate_function.h"
-#include "exprs/expr.h"
-#include "exprs/expr_context.h"
#include "exprs/hybrid_set.h"
-#include "exprs/in_predicate.h"
#include "exprs/literal.h"
#include "exprs/minmax_predicate.h"
#include "gen_cpp/internal_service.pb.h"
@@ -229,7 +226,6 @@ PFilterType get_type(RuntimeFilterType type) {
}
}
-template <bool is_vectorized = false>
Status create_literal(ObjectPool* pool, const TypeDescriptor& type, const
void* data, void** expr) {
TExprNode node;
@@ -315,11 +311,7 @@ Status create_literal(ObjectPool* pool, const
TypeDescriptor& type, const void*
return Status::InvalidArgument("Invalid type!");
}
- if constexpr (is_vectorized) {
- *reinterpret_cast<vectorized::VExpr**>(expr) = pool->add(new
vectorized::VLiteral(node));
- } else {
- *reinterpret_cast<Expr**>(expr) = pool->add(new Literal(node));
- }
+ *reinterpret_cast<vectorized::VExpr**>(expr) = pool->add(new
vectorized::VLiteral(node));
return Status::OK();
}
@@ -344,7 +336,7 @@ BinaryPredicate* create_bin_predicate(ObjectPool* pool,
PrimitiveType prim_type,
}
Status create_vbin_predicate(ObjectPool* pool, const TypeDescriptor& type,
TExprOpcode::type opcode,
- doris::vectorized::VExpr** expr, TExprNode*
tnode) {
+ vectorized::VExpr** expr, TExprNode* tnode) {
TExprNode node;
TScalarType tscalar_type;
tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
@@ -400,7 +392,7 @@ Status create_vbin_predicate(ObjectPool* pool, const
TypeDescriptor& type, TExpr
fn.__set_has_var_args(false);
node.__set_fn(fn);
*tnode = node;
- return doris::vectorized::VExpr::create_expr(pool, node, expr);
+ return vectorized::VExpr::create_expr(pool, node, expr);
}
// This class is a wrapper of runtime predicate function
class RuntimePredicateWrapper {
@@ -610,11 +602,8 @@ public:
return real_filter_type;
}
- template <class T>
- Status get_push_context(T* container, RuntimeState* state, ExprContext*
prob_expr);
-
- Status get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container,
RuntimeState* state,
- doris::vectorized::VExprContext* prob_expr);
+ Status get_push_vexprs(std::vector<vectorized::VExpr*>* container,
RuntimeState* state,
+ vectorized::VExprContext* prob_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
bool can_not_merge_in_or_bloom = _filter_type ==
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -1156,14 +1145,6 @@ void IRuntimeFilter::publish_finally() {
join_rpc();
}
-Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>*
push_expr_ctxs) {
- DCHECK(is_consumer());
- if (!_is_ignored) {
- return _wrapper->get_push_context(push_expr_ctxs, _state, _probe_ctx);
- }
- return Status::OK();
-}
-
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>*
push_vexprs) {
DCHECK(is_consumer());
if (!_is_ignored) {
@@ -1176,32 +1157,7 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
}
}
-Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>*
push_expr_ctxs,
- ExprContext* probe_ctx) {
- DCHECK(is_producer());
- return _wrapper->get_push_context(push_expr_ctxs, _state, probe_ctx);
-}
-
-Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>*
push_expr_ctxs,
- const RowDescriptor& desc) {
- if (_is_ignored) {
- return Status::OK();
- }
- DCHECK(!_state->enable_pipeline_exec() && _rf_state ==
RuntimeFilterState::READY);
- DCHECK(is_consumer());
- std::lock_guard<std::mutex> guard(_inner_mutex);
-
- if (_push_down_ctxs.empty()) {
- RETURN_IF_ERROR(_wrapper->get_push_context(&_push_down_ctxs, _state,
_probe_ctx));
- RETURN_IF_ERROR(Expr::prepare(_push_down_ctxs, _state, desc));
- RETURN_IF_ERROR(Expr::open(_push_down_ctxs, _state));
- }
- // push expr
- push_expr_ctxs->insert(push_expr_ctxs->end(), _push_down_ctxs.begin(),
_push_down_ctxs.end());
- return Status::OK();
-}
-
-Status
IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>*
vexprs,
+Status IRuntimeFilter::get_prepared_vexprs(std::vector<vectorized::VExpr*>*
vexprs,
const RowDescriptor& desc) {
_profile->add_info_string("Info", _format_status());
if (_is_ignored) {
@@ -1352,8 +1308,8 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
_expr_order = desc->expr_order;
_filter_id = desc->filter_id;
- ExprContext* build_ctx = nullptr;
- RETURN_IF_ERROR(Expr::create_expr_tree(_pool, desc->src_expr, &build_ctx));
+ vectorized::VExprContext* build_ctx = nullptr;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr,
&build_ctx));
RuntimeFilterParams params;
params.fragment_instance_id = fragment_instance_id;
@@ -1372,9 +1328,9 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
if (!desc->__isset.bitmap_target_expr) {
return Status::InvalidArgument("Unknown bitmap filter target
expr.");
}
- doris::vectorized::VExprContext* bitmap_target_ctx = nullptr;
- RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool,
desc->bitmap_target_expr,
-
&bitmap_target_ctx));
+ vectorized::VExprContext* bitmap_target_ctx = nullptr;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool,
desc->bitmap_target_expr,
+
&bitmap_target_ctx));
params.column_return_type = bitmap_target_ctx->root()->type().type;
if (desc->__isset.bitmap_filter_not_in) {
@@ -1389,9 +1345,7 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
DCHECK(false) << "runtime filter not found node_id:" << node_id;
return Status::InternalError("not found a node id");
}
- RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second,
&_probe_ctx));
- RETURN_IF_ERROR(
- doris::vectorized::VExpr::create_expr_tree(_pool,
iter->second, &_vprobe_ctx));
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool,
iter->second, &_vprobe_ctx));
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
@@ -1607,21 +1561,19 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
return;
}
case TYPE_DATEV2: {
-
batch_copy<doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>>(
+ batch_copy<vectorized::DateV2Value<vectorized::DateV2ValueType>>(
filter, it,
[](PColumnValue* column,
- const
doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>*
- value) {
+ const vectorized::DateV2Value<vectorized::DateV2ValueType>*
value) {
column->set_intval(*reinterpret_cast<const
int32_t*>(value));
});
return;
}
case TYPE_DATETIMEV2: {
-
batch_copy<doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>>(
+ batch_copy<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
filter, it,
[](PColumnValue* column,
- const
doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>*
- value) {
+ const
vectorized::DateV2Value<vectorized::DateTimeV2ValueType>* value) {
column->set_longval(*reinterpret_cast<const
int64_t*>(value));
});
return;
@@ -1812,86 +1764,12 @@ Status IRuntimeFilter::update_filter(const
UpdateRuntimeFilterParams* param) {
Status IRuntimeFilter::consumer_close() {
DCHECK(is_consumer());
- Expr::close(_push_down_ctxs, _state);
- return Status::OK();
-}
-
-template <class T>
-Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState*
state,
- ExprContext* prob_expr) {
- DCHECK(state != nullptr);
- DCHECK(container != nullptr);
- DCHECK(_pool != nullptr);
- DCHECK(prob_expr->root()->type().type == _column_return_type ||
- (is_string_type(prob_expr->root()->type().type) &&
is_string_type(_column_return_type)));
-
- auto real_filter_type = get_real_type();
- switch (real_filter_type) {
- case RuntimeFilterType::IN_FILTER: {
- if (!_is_ignored_in_filter) {
- TTypeDesc type_desc = create_type_desc(_column_return_type);
- TExprNode node;
- node.__set_type(type_desc);
- node.__set_node_type(TExprNodeType::IN_PRED);
- node.in_predicate.__set_is_not_in(false);
- node.__set_opcode(TExprOpcode::FILTER_IN);
- node.__isset.vector_opcode = true;
- node.__set_vector_opcode(to_in_opcode(_column_return_type));
- auto in_pred = _pool->add(new InPredicate(node));
- RETURN_IF_ERROR(in_pred->prepare(state,
_context.hybrid_set.get()));
- in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- ExprContext* ctx = _pool->add(new ExprContext(in_pred));
- container->push_back(ctx);
- }
- break;
- }
- case RuntimeFilterType::MINMAX_FILTER: {
- // create max filter
- Expr* max_literal = nullptr;
- auto max_pred = create_bin_predicate(_pool, _column_return_type,
TExprOpcode::LE);
- RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
- _context.minmax_func->get_max(),
- (void**)&max_literal));
- max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- max_pred->add_child(max_literal);
- container->push_back(_pool->add(new ExprContext(max_pred)));
- // create min filter
- Expr* min_literal = nullptr;
- auto min_pred = create_bin_predicate(_pool, _column_return_type,
TExprOpcode::GE);
- RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
- _context.minmax_func->get_min(),
- (void**)&min_literal));
- min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- min_pred->add_child(min_literal);
- container->push_back(_pool->add(new ExprContext(min_pred)));
- break;
- }
- case RuntimeFilterType::BLOOM_FILTER: {
- // create a bloom filter
- TTypeDesc type_desc = create_type_desc(_column_return_type);
- TExprNode node;
- node.__set_type(type_desc);
- node.__set_node_type(TExprNodeType::BLOOM_PRED);
- node.__set_opcode(TExprOpcode::RT_FILTER);
- node.__isset.vector_opcode = true;
- node.__set_vector_opcode(to_in_opcode(_column_return_type));
- auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
- RETURN_IF_ERROR(bloom_pred->prepare(state,
_context.bloom_filter_func));
- bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
- container->push_back(ctx);
- break;
- }
- default:
- DCHECK(false);
- break;
- }
return Status::OK();
}
-Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::VExpr*>*
container,
+Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<vectorized::VExpr*>*
container,
RuntimeState* state,
-
doris::vectorized::VExprContext* vprob_expr) {
+ vectorized::VExprContext*
vprob_expr) {
DCHECK(state != nullptr);
DCHECK(container != nullptr);
DCHECK(_pool != nullptr);
@@ -1925,15 +1803,14 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
- doris::vectorized::VExpr* max_pred = nullptr;
+ vectorized::VExpr* max_pred = nullptr;
// create max filter
TExprNode max_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(_pool,
vprob_expr->root()->type(), TExprOpcode::LE,
&max_pred, &max_pred_node));
- doris::vectorized::VExpr* max_literal = nullptr;
- RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
- _context.minmax_func->get_max(),
- (void**)&max_literal));
+ vectorized::VExpr* max_literal = nullptr;
+ RETURN_IF_ERROR(create_literal(_pool, vprob_expr->root()->type(),
+ _context.minmax_func->get_max(),
(void**)&max_literal));
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
max_pred->add_child(cloned_vexpr);
max_pred->add_child(max_literal);
@@ -1941,14 +1818,13 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
_pool->add(new
vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred)));
// create min filter
- doris::vectorized::VExpr* min_pred = nullptr;
+ vectorized::VExpr* min_pred = nullptr;
TExprNode min_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(_pool,
vprob_expr->root()->type(), TExprOpcode::GE,
&min_pred, &min_pred_node));
- doris::vectorized::VExpr* min_literal = nullptr;
- RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
- _context.minmax_func->get_min(),
- (void**)&min_literal));
+ vectorized::VExpr* min_literal = nullptr;
+ RETURN_IF_ERROR(create_literal(_pool, vprob_expr->root()->type(),
+ _context.minmax_func->get_min(),
(void**)&min_literal));
cloned_vexpr = vprob_expr->root()->clone(_pool);
min_pred->add_child(cloned_vexpr);
min_pred->add_child(min_literal);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e1a4b236e3..bc818982b0 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -17,7 +17,6 @@
#pragma once
-#include "exprs/expr_context.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/time.h"
@@ -30,7 +29,6 @@ class IOBufAsZeroCopyInputStream;
namespace doris {
class Predicate;
class ObjectPool;
-class ExprContext;
class RuntimeState;
class RuntimePredicateWrapper;
class MemTracker;
@@ -147,7 +145,6 @@ public:
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
_always_true(false),
- _probe_ctx(nullptr),
_is_ignored(false),
registration_time_(MonotonicMillis()) {}
@@ -174,21 +171,8 @@ public:
RuntimeFilterType type() const { return _runtime_filter_type; }
- // get push down expr context
- // This function can only be called once
- // _wrapper's function will be clear
- // only consumer could call this
- Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
-
Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
- // This function is used by UT and producer
- Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs,
ExprContext* probe_ctx);
-
- // This function can be called multiple times
- Status get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
- const RowDescriptor& desc);
-
Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>*
push_vexprs,
const RowDescriptor& desc);
@@ -347,23 +331,12 @@ protected:
// this filter won't filter any data
bool _always_true;
- // build expr_context
- // ExprContext* _build_ctx;
- // probe expr_context
- // it only used in consumer to generate runtime_filter expr_context
- // we don't have to prepare it or close it
- ExprContext* _probe_ctx;
doris::vectorized::VExprContext* _vprobe_ctx;
// Indicate whether runtime filter expr has been ignored
bool _is_ignored;
std::string _ignored_msg;
- // some runtime filter will generate
- // multiple contexts such as minmax filter
- // these context is called prepared by this,
- // consumer_close should be called before release
- std::vector<ExprContext*> _push_down_ctxs;
std::vector<doris::vectorized::VExpr*> _push_down_vexprs;
struct rpc_context;
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index c6fa1d2381..f050a1edb6 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -263,6 +263,5 @@ private:
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
};
-using RuntimeFilterSlots = RuntimeFilterSlotsBase<ExprContext>;
using VRuntimeFilterSlots = RuntimeFilterSlotsBase<vectorized::VExprContext>;
} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d0272fd8ad..c4880806e5 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -61,7 +61,6 @@ set(EXPRS_TEST_FILES
exprs/encryption_functions_test.cpp
exprs/math_functions_test.cpp
exprs/topn_function_test.cpp
- exprs/runtime_filter_test.cpp
exprs/bloom_filter_predicate_test.cpp
exprs/array_functions_test.cpp
exprs/quantile_function_test.cpp
diff --git a/be/test/exprs/runtime_filter_test.cpp
b/be/test/exprs/runtime_filter_test.cpp
deleted file mode 100644
index ee319ac6cb..0000000000
--- a/be/test/exprs/runtime_filter_test.cpp
+++ /dev/null
@@ -1,538 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exprs/runtime_filter.h"
-
-#include <array>
-#include <memory>
-
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/expr_context.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/Planner_types.h"
-#include "gen_cpp/Types_types.h"
-#include "gmock/gmock.h"
-#include "gtest/gtest.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
-
-namespace doris {
-TTypeDesc create_type_desc(PrimitiveType type, int precision, int scale);
-
-class RuntimeFilterTest : public testing::Test {
-public:
- RuntimeFilterTest() {}
- virtual void SetUp() {
- ExecEnv* exec_env = ExecEnv::GetInstance();
- exec_env = nullptr;
- _runtime_stat.reset(
- new RuntimeState(_fragment_id, _query_options, _query_globals,
exec_env));
- _runtime_stat->init_mem_trackers();
- }
- virtual void TearDown() { _obj_pool.clear(); }
-
-private:
- ObjectPool _obj_pool;
- TUniqueId _fragment_id;
- TQueryOptions _query_options;
- TQueryGlobals _query_globals;
-
- std::unique_ptr<RuntimeState> _runtime_stat;
- // std::unique_ptr<IRuntimeFilter> _runtime_filter;
-};
-
-IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type,
TQueryOptions* options,
- RuntimeState* _runtime_stat, ObjectPool*
_obj_pool) {
- TRuntimeFilterDesc desc;
- desc.__set_filter_id(0);
- desc.__set_expr_order(0);
- desc.__set_has_local_targets(true);
- desc.__set_has_remote_targets(false);
- desc.__set_is_broadcast_join(true);
- desc.__set_type(type);
- desc.__set_bloom_filter_size_bytes(4096);
-
- // build src expr context
-
- {
- TExpr build_expr;
- TExprNode expr_node;
- expr_node.__set_node_type(TExprNodeType::SLOT_REF);
- expr_node.__set_type(create_type_desc(TYPE_INT));
- expr_node.__set_num_children(0);
- expr_node.__isset.slot_ref = true;
- TSlotRef slot_ref;
- slot_ref.__set_slot_id(0);
- slot_ref.__set_tuple_id(0);
- expr_node.__set_slot_ref(slot_ref);
- expr_node.__isset.output_column = true;
- expr_node.__set_output_column(0);
- build_expr.nodes.push_back(expr_node);
- desc.__set_src_expr(build_expr);
- }
- // build dst expr
- {
- TExpr target_expr;
- TExprNode expr_node;
- expr_node.__set_node_type(TExprNodeType::SLOT_REF);
- expr_node.__set_type(create_type_desc(TYPE_INT));
- expr_node.__set_num_children(0);
- expr_node.__isset.slot_ref = true;
- TSlotRef slot_ref;
- slot_ref.__set_slot_id(0);
- slot_ref.__set_tuple_id(0);
- expr_node.__set_slot_ref(slot_ref);
- expr_node.__isset.output_column = true;
- expr_node.__set_output_column(0);
- target_expr.nodes.push_back(expr_node);
- std::map<int, TExpr> planid_to_target_expr = {{0, target_expr}};
- desc.__set_planId_to_target_expr(planid_to_target_expr);
- }
-
- IRuntimeFilter* runtime_filter = nullptr;
- Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc,
options,
- RuntimeFilterRole::PRODUCER, -1,
&runtime_filter);
-
- EXPECT_TRUE(status.ok()) << status.to_string();
-
- if (auto bf = runtime_filter->get_bloomfilter()) {
- status = bf->init_with_fixed_length();
- EXPECT_TRUE(status.ok()) << status.to_string();
- }
-
- return status.ok() ? runtime_filter : nullptr;
-}
-
-std::vector<TupleRow>* create_rows(ObjectPool* _obj_pool, int from, int to) {
- auto& rows = *(_obj_pool->add(new std::vector<TupleRow>(to - from + 1)));
- int i = 0;
- while (from + i <= to) {
- std::array<int, 2>* data = _obj_pool->add(new std::array<int, 2>());
- data->at(0) = data->at(1) = from + i;
- TupleRow row;
- row._tuples[0] = (Tuple*)data->data();
- rows[i++] = row;
- }
- return &rows;
-}
-
-void insert(IRuntimeFilter* runtime_filter, ExprContext* expr_ctx,
std::vector<TupleRow>* rows) {
- for (TupleRow& row : *rows) {
- void* val = expr_ctx->get_value(&row);
- runtime_filter->insert(val);
- }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- auto tuple_rows = create_rows(&_obj_pool, 1, 1024);
- auto not_exist_data = create_rows(&_obj_pool, 1025, 2048);
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 1024;
-
- IRuntimeFilter* runtime_filter =
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter, build_expr_ctx, tuple_rows);
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_TRUE(!expr_context_list.empty());
-
- // test data in
- for (TupleRow& row : *tuple_rows) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- // test not exist data
- for (TupleRow& row : *not_exist_data) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_merge_in_filter_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 1024 * 2 + 1;
-
- auto rows1 = create_rows(&_obj_pool, 1, 1024);
- auto rows2 = create_rows(&_obj_pool, 1025, 2048);
-
- IRuntimeFilter* runtime_filter =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_TRUE(!expr_context_list.empty());
-
- // test data in
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_ignore_in_filter_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 2;
-
- auto rows1 = create_rows(&_obj_pool, 1, 1);
- auto rows2 = create_rows(&_obj_pool, 2, 2);
-
- IRuntimeFilter* runtime_filter =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_TRUE(runtime_filter->is_ignored());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_TRUE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_in_or_bloom_filter_in_merge_in_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 3;
-
- auto rows1 = create_rows(&_obj_pool, 1, 1);
- auto rows2 = create_rows(&_obj_pool, 2, 2);
-
- IRuntimeFilter* runtime_filter = create_runtime_filter(
- TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(),
&_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
- EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_FALSE(runtime_filter->is_ignored());
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_FALSE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_in_merge_in_upgrade_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 2;
-
- auto rows1 = create_rows(&_obj_pool, 1, 1);
- auto rows2 = create_rows(&_obj_pool, 2, 2);
-
- IRuntimeFilter* runtime_filter = create_runtime_filter(
- TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(),
&_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
- EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_FALSE(runtime_filter->is_ignored());
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_FALSE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_in_merge_bloom_filter_upgrade_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 100;
-
- auto rows1 = create_rows(&_obj_pool, 1, 1);
- auto rows2 = create_rows(&_obj_pool, 2, 2);
-
- IRuntimeFilter* runtime_filter = create_runtime_filter(
- TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(),
&_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
- EXPECT_TRUE(runtime_filter2->is_bloomfilter());
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_FALSE(runtime_filter->is_ignored());
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_FALSE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_bloom_filter_merge_in_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 3;
-
- auto rows1 = create_rows(&_obj_pool, 1, 3);
- auto rows2 = create_rows(&_obj_pool, 4, 4);
-
- IRuntimeFilter* runtime_filter = create_runtime_filter(
- TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(),
&_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
- runtime_filter->change_to_bloom_filter();
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::IN, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
- EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_FALSE(runtime_filter->is_ignored());
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_FALSE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-TEST_F(RuntimeFilterTest,
runtime_filter_in_or_bloom_filter_bloom_filter_merge_bloom_filter_test) {
- SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
- ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
- ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
- TQueryOptions options;
- options.runtime_filter_max_in_num = 3;
-
- auto rows1 = create_rows(&_obj_pool, 1, 3);
- auto rows2 = create_rows(&_obj_pool, 4, 6);
-
- IRuntimeFilter* runtime_filter = create_runtime_filter(
- TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(),
&_obj_pool);
- insert(runtime_filter, build_expr_ctx, rows1);
- EXPECT_FALSE(runtime_filter->is_bloomfilter());
- runtime_filter->change_to_bloom_filter();
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- IRuntimeFilter* runtime_filter2 =
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-
_runtime_stat.get(), &_obj_pool);
- insert(runtime_filter2, build_expr_ctx, rows2);
- EXPECT_TRUE(runtime_filter2->is_bloomfilter());
-
- Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
- EXPECT_TRUE(status.ok());
- EXPECT_FALSE(runtime_filter->is_ignored());
- EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
- // get expr context from filter
-
- std::list<ExprContext*> expr_context_list;
- EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list,
prob_expr_ctx).ok());
- EXPECT_FALSE(expr_context_list.empty());
-
- for (TupleRow& row : *rows1) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
- for (TupleRow& row : *rows2) {
- for (ExprContext* ctx : expr_context_list) {
- EXPECT_TRUE(ctx->get_boolean_val(&row).val);
- }
- }
-
- // test null
- for (ExprContext* ctx : expr_context_list) {
- TupleRow row;
- row._tuples[0] = nullptr;
- EXPECT_FALSE(ctx->get_boolean_val(&row).val);
- }
-}
-
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]