This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c91fab2e [refactor](vec) delete non-vec runtime filter (#16016)
95c91fab2e is described below

commit 95c91fab2e6d25634e2b3915b0bfad583964bc3a
Author: Gabriel <[email protected]>
AuthorDate: Wed Jan 18 17:49:20 2023 +0800

    [refactor](vec) delete non-vec runtime filter (#16016)
    
    * [refactor](vec) delete non-vec runtime filter
    
    * update
---
 be/src/exprs/runtime_filter.cpp       | 176 ++---------
 be/src/exprs/runtime_filter.h         |  27 --
 be/src/exprs/runtime_filter_slots.h   |   1 -
 be/test/CMakeLists.txt                |   1 -
 be/test/exprs/runtime_filter_test.cpp | 538 ----------------------------------
 5 files changed, 26 insertions(+), 717 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index d5a97d766a..b2dc2351a6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -25,10 +25,7 @@
 #include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/create_predicate_function.h"
-#include "exprs/expr.h"
-#include "exprs/expr_context.h"
 #include "exprs/hybrid_set.h"
-#include "exprs/in_predicate.h"
 #include "exprs/literal.h"
 #include "exprs/minmax_predicate.h"
 #include "gen_cpp/internal_service.pb.h"
@@ -229,7 +226,6 @@ PFilterType get_type(RuntimeFilterType type) {
     }
 }
 
-template <bool is_vectorized = false>
 Status create_literal(ObjectPool* pool, const TypeDescriptor& type, const 
void* data, void** expr) {
     TExprNode node;
 
@@ -315,11 +311,7 @@ Status create_literal(ObjectPool* pool, const 
TypeDescriptor& type, const void*
         return Status::InvalidArgument("Invalid type!");
     }
 
-    if constexpr (is_vectorized) {
-        *reinterpret_cast<vectorized::VExpr**>(expr) = pool->add(new 
vectorized::VLiteral(node));
-    } else {
-        *reinterpret_cast<Expr**>(expr) = pool->add(new Literal(node));
-    }
+    *reinterpret_cast<vectorized::VExpr**>(expr) = pool->add(new 
vectorized::VLiteral(node));
 
     return Status::OK();
 }
@@ -344,7 +336,7 @@ BinaryPredicate* create_bin_predicate(ObjectPool* pool, 
PrimitiveType prim_type,
 }
 
 Status create_vbin_predicate(ObjectPool* pool, const TypeDescriptor& type, 
TExprOpcode::type opcode,
-                             doris::vectorized::VExpr** expr, TExprNode* 
tnode) {
+                             vectorized::VExpr** expr, TExprNode* tnode) {
     TExprNode node;
     TScalarType tscalar_type;
     tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
@@ -400,7 +392,7 @@ Status create_vbin_predicate(ObjectPool* pool, const 
TypeDescriptor& type, TExpr
     fn.__set_has_var_args(false);
     node.__set_fn(fn);
     *tnode = node;
-    return doris::vectorized::VExpr::create_expr(pool, node, expr);
+    return vectorized::VExpr::create_expr(pool, node, expr);
 }
 // This class is a wrapper of runtime predicate function
 class RuntimePredicateWrapper {
@@ -610,11 +602,8 @@ public:
         return real_filter_type;
     }
 
-    template <class T>
-    Status get_push_context(T* container, RuntimeState* state, ExprContext* 
prob_expr);
-
-    Status get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container, 
RuntimeState* state,
-                           doris::vectorized::VExprContext* prob_expr);
+    Status get_push_vexprs(std::vector<vectorized::VExpr*>* container, 
RuntimeState* state,
+                           vectorized::VExprContext* prob_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
         bool can_not_merge_in_or_bloom = _filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -1156,14 +1145,6 @@ void IRuntimeFilter::publish_finally() {
     join_rpc();
 }
 
-Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* 
push_expr_ctxs) {
-    DCHECK(is_consumer());
-    if (!_is_ignored) {
-        return _wrapper->get_push_context(push_expr_ctxs, _state, _probe_ctx);
-    }
-    return Status::OK();
-}
-
 Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* 
push_vexprs) {
     DCHECK(is_consumer());
     if (!_is_ignored) {
@@ -1176,32 +1157,7 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
     }
 }
 
-Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* 
push_expr_ctxs,
-                                          ExprContext* probe_ctx) {
-    DCHECK(is_producer());
-    return _wrapper->get_push_context(push_expr_ctxs, _state, probe_ctx);
-}
-
-Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* 
push_expr_ctxs,
-                                            const RowDescriptor& desc) {
-    if (_is_ignored) {
-        return Status::OK();
-    }
-    DCHECK(!_state->enable_pipeline_exec() && _rf_state == 
RuntimeFilterState::READY);
-    DCHECK(is_consumer());
-    std::lock_guard<std::mutex> guard(_inner_mutex);
-
-    if (_push_down_ctxs.empty()) {
-        RETURN_IF_ERROR(_wrapper->get_push_context(&_push_down_ctxs, _state, 
_probe_ctx));
-        RETURN_IF_ERROR(Expr::prepare(_push_down_ctxs, _state, desc));
-        RETURN_IF_ERROR(Expr::open(_push_down_ctxs, _state));
-    }
-    // push expr
-    push_expr_ctxs->insert(push_expr_ctxs->end(), _push_down_ctxs.begin(), 
_push_down_ctxs.end());
-    return Status::OK();
-}
-
-Status 
IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* 
vexprs,
+Status IRuntimeFilter::get_prepared_vexprs(std::vector<vectorized::VExpr*>* 
vexprs,
                                            const RowDescriptor& desc) {
     _profile->add_info_string("Info", _format_status());
     if (_is_ignored) {
@@ -1352,8 +1308,8 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     _expr_order = desc->expr_order;
     _filter_id = desc->filter_id;
 
-    ExprContext* build_ctx = nullptr;
-    RETURN_IF_ERROR(Expr::create_expr_tree(_pool, desc->src_expr, &build_ctx));
+    vectorized::VExprContext* build_ctx = nullptr;
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr, 
&build_ctx));
 
     RuntimeFilterParams params;
     params.fragment_instance_id = fragment_instance_id;
@@ -1372,9 +1328,9 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
         if (!desc->__isset.bitmap_target_expr) {
             return Status::InvalidArgument("Unknown bitmap filter target 
expr.");
         }
-        doris::vectorized::VExprContext* bitmap_target_ctx = nullptr;
-        RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, 
desc->bitmap_target_expr,
-                                                                   
&bitmap_target_ctx));
+        vectorized::VExprContext* bitmap_target_ctx = nullptr;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, 
desc->bitmap_target_expr,
+                                                            
&bitmap_target_ctx));
         params.column_return_type = bitmap_target_ctx->root()->type().type;
 
         if (desc->__isset.bitmap_filter_not_in) {
@@ -1389,9 +1345,7 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
             DCHECK(false) << "runtime filter not found node_id:" << node_id;
             return Status::InternalError("not found a node id");
         }
-        RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second, 
&_probe_ctx));
-        RETURN_IF_ERROR(
-                doris::vectorized::VExpr::create_expr_tree(_pool, 
iter->second, &_vprobe_ctx));
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, 
iter->second, &_vprobe_ctx));
     }
 
     _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
@@ -1607,21 +1561,19 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
         return;
     }
     case TYPE_DATEV2: {
-        
batch_copy<doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>>(
+        batch_copy<vectorized::DateV2Value<vectorized::DateV2ValueType>>(
                 filter, it,
                 [](PColumnValue* column,
-                   const 
doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>*
-                           value) {
+                   const vectorized::DateV2Value<vectorized::DateV2ValueType>* 
value) {
                     column->set_intval(*reinterpret_cast<const 
int32_t*>(value));
                 });
         return;
     }
     case TYPE_DATETIMEV2: {
-        
batch_copy<doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>>(
+        batch_copy<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
                 filter, it,
                 [](PColumnValue* column,
-                   const 
doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>*
-                           value) {
+                   const 
vectorized::DateV2Value<vectorized::DateTimeV2ValueType>* value) {
                     column->set_longval(*reinterpret_cast<const 
int64_t*>(value));
                 });
         return;
@@ -1812,86 +1764,12 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
 
 Status IRuntimeFilter::consumer_close() {
     DCHECK(is_consumer());
-    Expr::close(_push_down_ctxs, _state);
-    return Status::OK();
-}
-
-template <class T>
-Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* 
state,
-                                                 ExprContext* prob_expr) {
-    DCHECK(state != nullptr);
-    DCHECK(container != nullptr);
-    DCHECK(_pool != nullptr);
-    DCHECK(prob_expr->root()->type().type == _column_return_type ||
-           (is_string_type(prob_expr->root()->type().type) && 
is_string_type(_column_return_type)));
-
-    auto real_filter_type = get_real_type();
-    switch (real_filter_type) {
-    case RuntimeFilterType::IN_FILTER: {
-        if (!_is_ignored_in_filter) {
-            TTypeDesc type_desc = create_type_desc(_column_return_type);
-            TExprNode node;
-            node.__set_type(type_desc);
-            node.__set_node_type(TExprNodeType::IN_PRED);
-            node.in_predicate.__set_is_not_in(false);
-            node.__set_opcode(TExprOpcode::FILTER_IN);
-            node.__isset.vector_opcode = true;
-            node.__set_vector_opcode(to_in_opcode(_column_return_type));
-            auto in_pred = _pool->add(new InPredicate(node));
-            RETURN_IF_ERROR(in_pred->prepare(state, 
_context.hybrid_set.get()));
-            in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-            ExprContext* ctx = _pool->add(new ExprContext(in_pred));
-            container->push_back(ctx);
-        }
-        break;
-    }
-    case RuntimeFilterType::MINMAX_FILTER: {
-        // create max filter
-        Expr* max_literal = nullptr;
-        auto max_pred = create_bin_predicate(_pool, _column_return_type, 
TExprOpcode::LE);
-        RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
-                                              _context.minmax_func->get_max(),
-                                              (void**)&max_literal));
-        max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-        max_pred->add_child(max_literal);
-        container->push_back(_pool->add(new ExprContext(max_pred)));
-        // create min filter
-        Expr* min_literal = nullptr;
-        auto min_pred = create_bin_predicate(_pool, _column_return_type, 
TExprOpcode::GE);
-        RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
-                                              _context.minmax_func->get_min(),
-                                              (void**)&min_literal));
-        min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-        min_pred->add_child(min_literal);
-        container->push_back(_pool->add(new ExprContext(min_pred)));
-        break;
-    }
-    case RuntimeFilterType::BLOOM_FILTER: {
-        // create a bloom filter
-        TTypeDesc type_desc = create_type_desc(_column_return_type);
-        TExprNode node;
-        node.__set_type(type_desc);
-        node.__set_node_type(TExprNodeType::BLOOM_PRED);
-        node.__set_opcode(TExprOpcode::RT_FILTER);
-        node.__isset.vector_opcode = true;
-        node.__set_vector_opcode(to_in_opcode(_column_return_type));
-        auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
-        RETURN_IF_ERROR(bloom_pred->prepare(state, 
_context.bloom_filter_func));
-        bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-        ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
-        container->push_back(ctx);
-        break;
-    }
-    default:
-        DCHECK(false);
-        break;
-    }
     return Status::OK();
 }
 
-Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::VExpr*>*
 container,
+Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<vectorized::VExpr*>* 
container,
                                                 RuntimeState* state,
-                                                
doris::vectorized::VExprContext* vprob_expr) {
+                                                vectorized::VExprContext* 
vprob_expr) {
     DCHECK(state != nullptr);
     DCHECK(container != nullptr);
     DCHECK(_pool != nullptr);
@@ -1925,15 +1803,14 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
         break;
     }
     case RuntimeFilterType::MINMAX_FILTER: {
-        doris::vectorized::VExpr* max_pred = nullptr;
+        vectorized::VExpr* max_pred = nullptr;
         // create max filter
         TExprNode max_pred_node;
         RETURN_IF_ERROR(create_vbin_predicate(_pool, 
vprob_expr->root()->type(), TExprOpcode::LE,
                                               &max_pred, &max_pred_node));
-        doris::vectorized::VExpr* max_literal = nullptr;
-        RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
-                                             _context.minmax_func->get_max(),
-                                             (void**)&max_literal));
+        vectorized::VExpr* max_literal = nullptr;
+        RETURN_IF_ERROR(create_literal(_pool, vprob_expr->root()->type(),
+                                       _context.minmax_func->get_max(), 
(void**)&max_literal));
         auto cloned_vexpr = vprob_expr->root()->clone(_pool);
         max_pred->add_child(cloned_vexpr);
         max_pred->add_child(max_literal);
@@ -1941,14 +1818,13 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
                 _pool->add(new 
vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred)));
 
         // create min filter
-        doris::vectorized::VExpr* min_pred = nullptr;
+        vectorized::VExpr* min_pred = nullptr;
         TExprNode min_pred_node;
         RETURN_IF_ERROR(create_vbin_predicate(_pool, 
vprob_expr->root()->type(), TExprOpcode::GE,
                                               &min_pred, &min_pred_node));
-        doris::vectorized::VExpr* min_literal = nullptr;
-        RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
-                                             _context.minmax_func->get_min(),
-                                             (void**)&min_literal));
+        vectorized::VExpr* min_literal = nullptr;
+        RETURN_IF_ERROR(create_literal(_pool, vprob_expr->root()->type(),
+                                       _context.minmax_func->get_min(), 
(void**)&min_literal));
         cloned_vexpr = vprob_expr->root()->clone(_pool);
         min_pred->add_child(cloned_vexpr);
         min_pred->add_child(min_literal);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e1a4b236e3..bc818982b0 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include "exprs/expr_context.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
@@ -30,7 +29,6 @@ class IOBufAsZeroCopyInputStream;
 namespace doris {
 class Predicate;
 class ObjectPool;
-class ExprContext;
 class RuntimeState;
 class RuntimePredicateWrapper;
 class MemTracker;
@@ -147,7 +145,6 @@ public:
               _role(RuntimeFilterRole::PRODUCER),
               _expr_order(-1),
               _always_true(false),
-              _probe_ctx(nullptr),
               _is_ignored(false),
               registration_time_(MonotonicMillis()) {}
 
@@ -174,21 +171,8 @@ public:
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
-    // get push down expr context
-    // This function can only be called once
-    // _wrapper's function will be clear
-    // only consumer could call this
-    Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
-
     Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
 
-    // This function is used by UT and producer
-    Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs, 
ExprContext* probe_ctx);
-
-    // This function can be called multiple times
-    Status get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
-                                const RowDescriptor& desc);
-
     Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* 
push_vexprs,
                                const RowDescriptor& desc);
 
@@ -347,23 +331,12 @@ protected:
     // this filter won't filter any data
     bool _always_true;
 
-    // build expr_context
-    // ExprContext* _build_ctx;
-    // probe expr_context
-    // it only used in consumer to generate runtime_filter expr_context
-    // we don't have to prepare it or close it
-    ExprContext* _probe_ctx;
     doris::vectorized::VExprContext* _vprobe_ctx;
 
     // Indicate whether runtime filter expr has been ignored
     bool _is_ignored;
     std::string _ignored_msg;
 
-    // some runtime filter will generate
-    // multiple contexts such as minmax filter
-    // these context is called prepared by this,
-    // consumer_close should be called before release
-    std::vector<ExprContext*> _push_down_ctxs;
     std::vector<doris::vectorized::VExpr*> _push_down_vexprs;
 
     struct rpc_context;
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index c6fa1d2381..f050a1edb6 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -263,6 +263,5 @@ private:
     std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
 };
 
-using RuntimeFilterSlots = RuntimeFilterSlotsBase<ExprContext>;
 using VRuntimeFilterSlots = RuntimeFilterSlotsBase<vectorized::VExprContext>;
 } // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d0272fd8ad..c4880806e5 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -61,7 +61,6 @@ set(EXPRS_TEST_FILES
     exprs/encryption_functions_test.cpp
     exprs/math_functions_test.cpp
     exprs/topn_function_test.cpp
-    exprs/runtime_filter_test.cpp
     exprs/bloom_filter_predicate_test.cpp
     exprs/array_functions_test.cpp
     exprs/quantile_function_test.cpp
diff --git a/be/test/exprs/runtime_filter_test.cpp 
b/be/test/exprs/runtime_filter_test.cpp
deleted file mode 100644
index ee319ac6cb..0000000000
--- a/be/test/exprs/runtime_filter_test.cpp
+++ /dev/null
@@ -1,538 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exprs/runtime_filter.h"
-
-#include <array>
-#include <memory>
-
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/expr_context.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/Planner_types.h"
-#include "gen_cpp/Types_types.h"
-#include "gmock/gmock.h"
-#include "gtest/gtest.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
-
-namespace doris {
-TTypeDesc create_type_desc(PrimitiveType type, int precision, int scale);
-
-class RuntimeFilterTest : public testing::Test {
-public:
-    RuntimeFilterTest() {}
-    virtual void SetUp() {
-        ExecEnv* exec_env = ExecEnv::GetInstance();
-        exec_env = nullptr;
-        _runtime_stat.reset(
-                new RuntimeState(_fragment_id, _query_options, _query_globals, 
exec_env));
-        _runtime_stat->init_mem_trackers();
-    }
-    virtual void TearDown() { _obj_pool.clear(); }
-
-private:
-    ObjectPool _obj_pool;
-    TUniqueId _fragment_id;
-    TQueryOptions _query_options;
-    TQueryGlobals _query_globals;
-
-    std::unique_ptr<RuntimeState> _runtime_stat;
-    // std::unique_ptr<IRuntimeFilter> _runtime_filter;
-};
-
-IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, 
TQueryOptions* options,
-                                      RuntimeState* _runtime_stat, ObjectPool* 
_obj_pool) {
-    TRuntimeFilterDesc desc;
-    desc.__set_filter_id(0);
-    desc.__set_expr_order(0);
-    desc.__set_has_local_targets(true);
-    desc.__set_has_remote_targets(false);
-    desc.__set_is_broadcast_join(true);
-    desc.__set_type(type);
-    desc.__set_bloom_filter_size_bytes(4096);
-
-    // build src expr context
-
-    {
-        TExpr build_expr;
-        TExprNode expr_node;
-        expr_node.__set_node_type(TExprNodeType::SLOT_REF);
-        expr_node.__set_type(create_type_desc(TYPE_INT));
-        expr_node.__set_num_children(0);
-        expr_node.__isset.slot_ref = true;
-        TSlotRef slot_ref;
-        slot_ref.__set_slot_id(0);
-        slot_ref.__set_tuple_id(0);
-        expr_node.__set_slot_ref(slot_ref);
-        expr_node.__isset.output_column = true;
-        expr_node.__set_output_column(0);
-        build_expr.nodes.push_back(expr_node);
-        desc.__set_src_expr(build_expr);
-    }
-    // build dst expr
-    {
-        TExpr target_expr;
-        TExprNode expr_node;
-        expr_node.__set_node_type(TExprNodeType::SLOT_REF);
-        expr_node.__set_type(create_type_desc(TYPE_INT));
-        expr_node.__set_num_children(0);
-        expr_node.__isset.slot_ref = true;
-        TSlotRef slot_ref;
-        slot_ref.__set_slot_id(0);
-        slot_ref.__set_tuple_id(0);
-        expr_node.__set_slot_ref(slot_ref);
-        expr_node.__isset.output_column = true;
-        expr_node.__set_output_column(0);
-        target_expr.nodes.push_back(expr_node);
-        std::map<int, TExpr> planid_to_target_expr = {{0, target_expr}};
-        desc.__set_planId_to_target_expr(planid_to_target_expr);
-    }
-
-    IRuntimeFilter* runtime_filter = nullptr;
-    Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc, 
options,
-                                           RuntimeFilterRole::PRODUCER, -1, 
&runtime_filter);
-
-    EXPECT_TRUE(status.ok()) << status.to_string();
-
-    if (auto bf = runtime_filter->get_bloomfilter()) {
-        status = bf->init_with_fixed_length();
-        EXPECT_TRUE(status.ok()) << status.to_string();
-    }
-
-    return status.ok() ? runtime_filter : nullptr;
-}
-
-std::vector<TupleRow>* create_rows(ObjectPool* _obj_pool, int from, int to) {
-    auto& rows = *(_obj_pool->add(new std::vector<TupleRow>(to - from + 1)));
-    int i = 0;
-    while (from + i <= to) {
-        std::array<int, 2>* data = _obj_pool->add(new std::array<int, 2>());
-        data->at(0) = data->at(1) = from + i;
-        TupleRow row;
-        row._tuples[0] = (Tuple*)data->data();
-        rows[i++] = row;
-    }
-    return &rows;
-}
-
-void insert(IRuntimeFilter* runtime_filter, ExprContext* expr_ctx, 
std::vector<TupleRow>* rows) {
-    for (TupleRow& row : *rows) {
-        void* val = expr_ctx->get_value(&row);
-        runtime_filter->insert(val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    auto tuple_rows = create_rows(&_obj_pool, 1, 1024);
-    auto not_exist_data = create_rows(&_obj_pool, 1025, 2048);
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 1024;
-
-    IRuntimeFilter* runtime_filter = 
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-                                                           
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter, build_expr_ctx, tuple_rows);
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_TRUE(!expr_context_list.empty());
-
-    // test data in
-    for (TupleRow& row : *tuple_rows) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    // test not exist data
-    for (TupleRow& row : *not_exist_data) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_merge_in_filter_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 1024 * 2 + 1;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 1024);
-    auto rows2 = create_rows(&_obj_pool, 1025, 2048);
-
-    IRuntimeFilter* runtime_filter = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                           
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_TRUE(!expr_context_list.empty());
-
-    // test data in
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_ignore_in_filter_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 2;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 1);
-    auto rows2 = create_rows(&_obj_pool, 2, 2);
-
-    IRuntimeFilter* runtime_filter = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                           
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_TRUE(runtime_filter->is_ignored());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_TRUE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, runtime_filter_in_or_bloom_filter_in_merge_in_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 3;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 1);
-    auto rows2 = create_rows(&_obj_pool, 2, 2);
-
-    IRuntimeFilter* runtime_filter = create_runtime_filter(
-            TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(), 
&_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-    EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_FALSE(runtime_filter->is_ignored());
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_FALSE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_in_merge_in_upgrade_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 2;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 1);
-    auto rows2 = create_rows(&_obj_pool, 2, 2);
-
-    IRuntimeFilter* runtime_filter = create_runtime_filter(
-            TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(), 
&_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-    EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_FALSE(runtime_filter->is_ignored());
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_FALSE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_in_merge_bloom_filter_upgrade_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 100;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 1);
-    auto rows2 = create_rows(&_obj_pool, 2, 2);
-
-    IRuntimeFilter* runtime_filter = create_runtime_filter(
-            TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(), 
&_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-    EXPECT_TRUE(runtime_filter2->is_bloomfilter());
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_FALSE(runtime_filter->is_ignored());
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_FALSE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_bloom_filter_merge_in_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 3;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 3);
-    auto rows2 = create_rows(&_obj_pool, 4, 4);
-
-    IRuntimeFilter* runtime_filter = create_runtime_filter(
-            TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(), 
&_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-    runtime_filter->change_to_bloom_filter();
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::IN, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-    EXPECT_FALSE(runtime_filter2->is_bloomfilter());
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_FALSE(runtime_filter->is_ignored());
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_FALSE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-TEST_F(RuntimeFilterTest, 
runtime_filter_in_or_bloom_filter_bloom_filter_merge_bloom_filter_test) {
-    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
-    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
-    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
-
-    TQueryOptions options;
-    options.runtime_filter_max_in_num = 3;
-
-    auto rows1 = create_rows(&_obj_pool, 1, 3);
-    auto rows2 = create_rows(&_obj_pool, 4, 6);
-
-    IRuntimeFilter* runtime_filter = create_runtime_filter(
-            TRuntimeFilterType::IN_OR_BLOOM, &options, _runtime_stat.get(), 
&_obj_pool);
-    insert(runtime_filter, build_expr_ctx, rows1);
-    EXPECT_FALSE(runtime_filter->is_bloomfilter());
-    runtime_filter->change_to_bloom_filter();
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    IRuntimeFilter* runtime_filter2 = 
create_runtime_filter(TRuntimeFilterType::BLOOM, &options,
-                                                            
_runtime_stat.get(), &_obj_pool);
-    insert(runtime_filter2, build_expr_ctx, rows2);
-    EXPECT_TRUE(runtime_filter2->is_bloomfilter());
-
-    Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper());
-    EXPECT_TRUE(status.ok());
-    EXPECT_FALSE(runtime_filter->is_ignored());
-    EXPECT_TRUE(runtime_filter->is_bloomfilter());
-
-    // get expr context from filter
-
-    std::list<ExprContext*> expr_context_list;
-    EXPECT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, 
prob_expr_ctx).ok());
-    EXPECT_FALSE(expr_context_list.empty());
-
-    for (TupleRow& row : *rows1) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-    for (TupleRow& row : *rows2) {
-        for (ExprContext* ctx : expr_context_list) {
-            EXPECT_TRUE(ctx->get_boolean_val(&row).val);
-        }
-    }
-
-    // test null
-    for (ExprContext* ctx : expr_context_list) {
-        TupleRow row;
-        row._tuples[0] = nullptr;
-        EXPECT_FALSE(ctx->get_boolean_val(&row).val);
-    }
-}
-
-} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to