This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ed7f7dead9 [Refactor](push-down predicate) Derive push-down predicate 
from vconjuncts (#11468)
ed7f7dead9 is described below

commit ed7f7dead95a361422666da4a589ba1fbc807e2c
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 8 19:19:26 2022 +0800

    [Refactor](push-down predicate) Derive push-down predicate from vconjuncts 
(#11468)
    
    * [Refactor](push-down predicate) Derive push-down predicate from vconjuncts
---
 be/src/exec/olap_common.h                 |    5 +
 be/src/exec/olap_scan_node.h              |    1 -
 be/src/exec/olap_utils.h                  |   22 +
 be/src/exprs/bloomfilter_predicate.cpp    |    5 +-
 be/src/exprs/bloomfilter_predicate.h      |    2 +-
 be/src/exprs/runtime_filter.cpp           |   18 +-
 be/src/exprs/runtime_filter.h             |    2 +
 be/src/vec/exec/volap_scan_node.cpp       | 1470 +++++++++++++----------------
 be/src/vec/exec/volap_scan_node.h         |   95 +-
 be/src/vec/exec/volap_scanner.cpp         |    4 -
 be/src/vec/exec/volap_scanner.h           |    5 -
 be/src/vec/exprs/vbloom_predicate.cpp     |    4 +-
 be/src/vec/exprs/vbloom_predicate.h       |    4 +-
 be/src/vec/exprs/vexpr.cpp                |    2 +-
 be/src/vec/exprs/vexpr.h                  |   20 +
 be/src/vec/exprs/vexpr_context.cpp        |    9 +-
 be/src/vec/exprs/vexpr_context.h          |    9 +
 be/src/vec/exprs/vin_predicate.h          |    4 +
 be/src/vec/exprs/vruntimefilter_wrapper.h |    2 +
 be/src/vec/exprs/vslot_ref.h              |    2 +
 be/src/vec/functions/in.cpp               |  154 +--
 be/src/vec/functions/{in.cpp => in.h}     |    8 +-
 22 files changed, 821 insertions(+), 1026 deletions(-)

diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index fb303de5f6..2f37b05633 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -256,6 +256,11 @@ public:
         range.remove_fixed_value(*value);
     }
 
+    static void add_value_range(ColumnValueRange<primitive_type>& range, 
SQLFilterOp op,
+                                CppType* value) {
+        range.add_range(op, *value);
+    }
+
     static ColumnValueRange<primitive_type> create_empty_column_value_range() {
         return 
ColumnValueRange<primitive_type>::create_empty_column_value_range("");
     }
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index ea2b0dc70d..4097c710fe 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -158,7 +158,6 @@ protected:
                                                          int conj_idx, int 
child_idx);
 
     friend class OlapScanner;
-    friend class vectorized::VOlapScanner;
 
     // Tuple id resolved in prepare() to set _tuple_desc;
     TupleId _tuple_id;
diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h
index eb0cf52e98..1fe0e21f71 100644
--- a/be/src/exec/olap_utils.h
+++ b/be/src/exec/olap_utils.h
@@ -184,6 +184,9 @@ inline int get_olap_size(PrimitiveType type) {
     return 0;
 }
 
+template <PrimitiveType>
+static constexpr bool always_false_v = false;
+
 inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) {
     switch (type) {
     case TExprOpcode::LT:
@@ -215,4 +218,23 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type 
type, bool opposite) {
     return FILTER_IN;
 }
 
+inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool 
opposite) {
+    if (function_name == "lt") {
+        return opposite ? FILTER_LARGER : FILTER_LESS;
+    } else if (function_name == "gt") {
+        return opposite ? FILTER_LESS : FILTER_LARGER;
+    } else if (function_name == "le") {
+        return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL;
+    } else if (function_name == "ge") {
+        return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL;
+    } else if (function_name == "eq") {
+        return opposite ? FILTER_NOT_IN : FILTER_IN;
+    } else if (function_name == "ne") {
+        return opposite ? FILTER_IN : FILTER_NOT_IN;
+    } else {
+        DCHECK(false) << "Function Name: " << function_name;
+        return FILTER_IN;
+    }
+}
+
 } // namespace doris
diff --git a/be/src/exprs/bloomfilter_predicate.cpp 
b/be/src/exprs/bloomfilter_predicate.cpp
index 26cd7a209b..fcf8589ac6 100644
--- a/be/src/exprs/bloomfilter_predicate.cpp
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -43,12 +43,13 @@ BloomFilterPredicate::BloomFilterPredicate(const 
BloomFilterPredicate& other)
           _filtered_rows(),
           _scan_rows() {}
 
-Status BloomFilterPredicate::prepare(RuntimeState* state, 
IBloomFilterFuncBase* filter) {
+Status BloomFilterPredicate::prepare(RuntimeState* state,
+                                     std::shared_ptr<IBloomFilterFuncBase> 
filter) {
     // DCHECK(filter != nullptr);
     if (_is_prepare) {
         return Status::OK();
     }
-    _filter.reset(filter);
+    _filter = filter;
     if (nullptr == _filter.get()) {
         return Status::InternalError("Unknown column type.");
     }
diff --git a/be/src/exprs/bloomfilter_predicate.h 
b/be/src/exprs/bloomfilter_predicate.h
index 7f420a2d8e..5fcf604518 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -365,7 +365,7 @@ public:
         return pool->add(new BloomFilterPredicate(*this));
     }
     using Predicate::prepare;
-    Status prepare(RuntimeState* state, IBloomFilterFuncBase* bloomfilterfunc);
+    Status prepare(RuntimeState* state, std::shared_ptr<IBloomFilterFuncBase> 
bloomfilterfunc);
 
     std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() { return 
_filter; }
 
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 5983b11fcc..89e832f3d0 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1000,7 +1000,7 @@ private:
     int32_t _max_in_num = -1;
     std::unique_ptr<MinMaxFuncBase> _minmax_func;
     std::unique_ptr<HybridSetBase> _hybrid_set;
-    std::unique_ptr<IBloomFilterFuncBase> _bloomfilter_func;
+    std::shared_ptr<IBloomFilterFuncBase> _bloomfilter_func;
     bool _is_bloomfilter = false;
     bool _is_ignored_in_filter = false;
     std::string* _ignored_in_filter_msg = nullptr;
@@ -1061,6 +1061,14 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctx
     return Status::OK();
 }
 
+Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* 
push_vexprs) {
+    DCHECK(is_consumer());
+    if (!_is_ignored) {
+        return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx);
+    }
+    return Status::OK();
+}
+
 Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* 
push_expr_ctxs,
                                           ExprContext* probe_ctx) {
     DCHECK(is_producer());
@@ -1649,7 +1657,7 @@ Status RuntimePredicateWrapper::get_push_context(T* 
container, RuntimeState* sta
         node.__isset.vector_opcode = true;
         node.__set_vector_opcode(to_in_opcode(_column_return_type));
         auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
-        RETURN_IF_ERROR(bloom_pred->prepare(state, 
_bloomfilter_func.release()));
+        RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func));
         bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
         ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
         container->push_back(ctx);
@@ -1677,6 +1685,8 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
     case RuntimeFilterType::IN_FILTER: {
         if (!_is_ignored_in_filter) {
             TTypeDesc type_desc = 
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+            type_desc.__set_is_nullable(
+                    _hybrid_set->size() > 0 ? true : 
vprob_expr->root()->is_nullable());
             TExprNode node;
             node.__set_type(type_desc);
             node.__set_node_type(TExprNodeType::IN_PRED);
@@ -1684,6 +1694,8 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
             node.__set_opcode(TExprOpcode::FILTER_IN);
             node.__isset.vector_opcode = true;
             node.__set_vector_opcode(to_in_opcode(_column_return_type));
+            node.__set_is_nullable(_hybrid_set->size() > 0 ? true
+                                                           : 
vprob_expr->root()->is_nullable());
 
             // VInPredicate
             doris::vectorized::VExpr* expr = nullptr;
@@ -1732,12 +1744,14 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
     case RuntimeFilterType::BLOOM_FILTER: {
         // create a bloom filter
         TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(vprob_expr->root()->is_nullable());
         TExprNode node;
         node.__set_type(type_desc);
         node.__set_node_type(TExprNodeType::BLOOM_PRED);
         node.__set_opcode(TExprOpcode::RT_FILTER);
         node.__isset.vector_opcode = true;
         node.__set_vector_opcode(to_in_opcode(_column_return_type));
+        node.__set_is_nullable(vprob_expr->root()->is_nullable());
         auto bloom_pred = _pool->add(new 
doris::vectorized::VBloomPredicate(node));
         bloom_pred->set_filter(_bloomfilter_func);
         auto cloned_vexpr = vprob_expr->root()->clone(_pool);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b4382d8874..587c0aa87a 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -154,6 +154,8 @@ public:
     // only consumer could call this
     Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
 
+    Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
+
     // This function is used by UT and producer
     Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs, 
ExprContext* probe_ctx);
 
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 4b4ddf313f..865b2d838b 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -30,12 +30,22 @@
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/exec/volap_scanner.h"
+#include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
+#include "vec/functions/in.h"
 
 namespace doris::vectorized {
 using doris::operator<<;
 
+#define RETURN_IF_PUSH_DOWN(stmt) \
+    if (!push_down) {             \
+        stmt;                     \
+    } else {                      \
+        return;                   \
+    }
+
 VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : ScanNode(pool, tnode, descs),
           _tuple_id(tnode.olap_scan_node.tuple_id),
@@ -59,7 +69,6 @@ VOlapScanNode::VOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode, const Des
 
 Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
-    _direct_conjunct_size = _conjunct_ctxs.size();
 
     const TQueryOptions& query_options = state->query_options();
     if (query_options.__isset.max_scan_key_num) {
@@ -244,6 +253,7 @@ Status VOlapScanNode::open(RuntimeState* state) {
     // acquire runtime filter
     _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
 
+    std::vector<VExpr*> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         auto& filter_desc = _runtime_filter_descs[i];
         IRuntimeFilter* runtime_filter = nullptr;
@@ -257,21 +267,12 @@ Status VOlapScanNode::open(RuntimeState* state) {
             ready = runtime_filter->await();
         }
         if (ready) {
-            std::list<ExprContext*> expr_context;
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
             _runtime_filter_ctxs[i].apply_mark = true;
             _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
-
-            for (auto ctx : expr_context) {
-                ctx->prepare(state, row_desc());
-                ctx->open(state);
-                int index = _conjunct_ctxs.size();
-                _conjunct_ctxs.push_back(ctx);
-                // it's safe to store address from a fix-resized vector
-                _conjunctid_to_runtime_filter_ctxs[index] = 
&_runtime_filter_ctxs[i];
-            }
         }
     }
+    RETURN_IF_ERROR(_append_rf_into_conjuncts(state, vexprs));
 
     return Status::OK();
 }
@@ -469,7 +470,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
                         if (_vconjunct_ctx_ptr) {
                             
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
                         }
-                        _vconjunct_ctx_ptr.reset(new 
doris::vectorized::VExprContext*);
+                        _vconjunct_ctx_ptr.reset(new VExprContext*);
                         *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
                         _runtime_filter_ready_flag[i] = true;
                     }
@@ -621,136 +622,41 @@ Status VOlapScanNode::_add_blocks(std::vector<Block*>& 
block) {
     return Status::OK();
 }
 
-void VOlapScanNode::eval_const_conjuncts() {
-    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
-        // if conjunct is constant, compute direct and set eos = true
-        if (_conjunct_ctxs[conj_idx]->root()->is_constant()) {
-            void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr);
-            if (value == nullptr || *reinterpret_cast<bool*>(value) == false) {
-                _eos = true;
-                break;
-            }
-        }
-    }
-}
-
 Status VOlapScanNode::normalize_conjuncts() {
     std::vector<SlotDescriptor*> slots = _tuple_desc->slots();
 
     for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
         switch (slots[slot_idx]->type().type) {
-        case TYPE_TINYINT: {
-            ColumnValueRange<TYPE_TINYINT> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_SMALLINT: {
-            ColumnValueRange<TYPE_SMALLINT> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_INT: {
-            ColumnValueRange<TYPE_INT> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_BIGINT: {
-            ColumnValueRange<TYPE_BIGINT> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_LARGEINT: {
-            ColumnValueRange<TYPE_LARGEINT> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_CHAR: {
-            ColumnValueRange<TYPE_CHAR> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-        case TYPE_VARCHAR: {
-            ColumnValueRange<TYPE_VARCHAR> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-        case TYPE_HLL: {
-            ColumnValueRange<TYPE_HLL> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-        case TYPE_STRING: {
-            ColumnValueRange<TYPE_STRING> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DATE: {
-            ColumnValueRange<TYPE_DATE> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-        case TYPE_DATETIME: {
-            ColumnValueRange<TYPE_DATETIME> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DATEV2: {
-            ColumnValueRange<TYPE_DATEV2> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DATETIMEV2: {
-            ColumnValueRange<TYPE_DATETIMEV2> 
range(slots[slot_idx]->col_name(),
-                                                    
slots[slot_idx]->type().precision,
-                                                    
slots[slot_idx]->type().scale);
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DECIMALV2: {
-            ColumnValueRange<TYPE_DECIMALV2> 
range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DECIMAL32: {
-            ColumnValueRange<TYPE_DECIMAL32> range(slots[slot_idx]->col_name(),
-                                                   
slots[slot_idx]->type().precision,
-                                                   
slots[slot_idx]->type().scale);
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DECIMAL64: {
-            ColumnValueRange<TYPE_DECIMAL64> range(slots[slot_idx]->col_name(),
-                                                   
slots[slot_idx]->type().precision,
-                                                   
slots[slot_idx]->type().scale);
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_DECIMAL128: {
-            ColumnValueRange<TYPE_DECIMAL128> 
range(slots[slot_idx]->col_name(),
-                                                    
slots[slot_idx]->type().precision,
-                                                    
slots[slot_idx]->type().scale);
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
-        case TYPE_BOOLEAN: {
-            ColumnValueRange<TYPE_BOOLEAN> range(slots[slot_idx]->col_name());
-            normalize_predicate(range, slots[slot_idx]);
-            break;
-        }
-
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        ColumnValueRange<TYPE_##NAME> range(slots[slot_idx]->col_name(),       
\
+                                            slots[slot_idx]->type().precision, 
\
+                                            slots[slot_idx]->type().scale);    
\
+        _id_to_slot_column_value_range[slots[slot_idx]->id()] =                
\
+                std::pair {slots[slot_idx], range};                            
\
+        break;                                                                 
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)
+            APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
         default: {
             VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << 
slots[slot_idx]->col_name()
                           << "]";
@@ -758,6 +664,28 @@ Status VOlapScanNode::normalize_conjuncts() {
         }
         }
     }
+    if (_vconjunct_ctx_ptr) {
+        if ((*_vconjunct_ctx_ptr)->root()) {
+            VExpr* new_root = _normalize_predicate(_runtime_state, 
(*_vconjunct_ctx_ptr)->root());
+            if (new_root) {
+                (*_vconjunct_ctx_ptr)->set_root(new_root);
+            } else {
+                (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+                _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+                _vconjunct_ctx_ptr.reset(nullptr);
+            }
+        }
+    }
+    for (auto& it : _id_to_slot_column_value_range) {
+        std::visit(
+                [&](auto&& range) {
+                    if (range.is_empty_value_range()) {
+                        _eos = true;
+                    }
+                },
+                it.second.second);
+        _column_value_ranges[it.second.first->col_name()] = it.second.second;
+    }
 
     return Status::OK();
 }
@@ -786,67 +714,6 @@ static std::string olap_filters_to_string(const 
std::vector<doris::TCondition>&
     return filters_string;
 }
 
-Status VOlapScanNode::build_function_filters() {
-    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
-        ExprContext* ex_ctx = _conjunct_ctxs[conj_idx];
-        Expr* fn_expr = ex_ctx->root();
-        bool opposite = false;
-
-        if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() &&
-            TExprOpcode::COMPOUND_NOT == fn_expr->op()) {
-            fn_expr = fn_expr->get_child(0);
-            opposite = true;
-        }
-
-        // currently only support like / not like
-        if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() &&
-            "like" == fn_expr->fn().name.function_name) {
-            doris_udf::FunctionContext* func_cxt =
-                    ex_ctx->fn_context(fn_expr->get_fn_context_index());
-
-            if (!func_cxt) {
-                continue;
-            }
-            if (fn_expr->children().size() != 2) {
-                continue;
-            }
-            SlotRef* slot_ref = nullptr;
-            Expr* literal_expr = nullptr;
-
-            if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type()) 
{
-                literal_expr = fn_expr->get_child(1);
-                slot_ref = (SlotRef*)(fn_expr->get_child(0));
-            } else if (TExprNodeType::SLOT_REF == 
fn_expr->get_child(1)->node_type()) {
-                literal_expr = fn_expr->get_child(0);
-                slot_ref = (SlotRef*)(fn_expr->get_child(1));
-            } else {
-                continue;
-            }
-
-            if (TExprNodeType::STRING_LITERAL != literal_expr->node_type()) 
continue;
-
-            const SlotDescriptor* slot_desc = nullptr;
-            std::vector<SlotId> slot_ids;
-            slot_ref->get_slot_ids(&slot_ids);
-            for (SlotDescriptor* slot : _tuple_desc->slots()) {
-                if (slot->id() == slot_ids[0]) {
-                    slot_desc = slot;
-                    break;
-                }
-            }
-
-            if (!slot_desc) {
-                continue;
-            }
-            std::string col = slot_desc->col_name();
-            StringVal val = literal_expr->get_string_val(ex_ctx, nullptr);
-            _push_down_functions.emplace_back(opposite, col, func_cxt, val);
-            _pushed_func_conjuncts_index.insert(conj_idx);
-        }
-    }
-    return Status::OK();
-}
-
 Status VOlapScanNode::build_key_ranges_and_filters() {
     const std::vector<std::string>& column_names = 
_olap_scan_node.key_column_name;
     const std::vector<TPrimitiveType::type>& column_types = 
_olap_scan_node.key_column_type;
@@ -898,62 +765,23 @@ Status VOlapScanNode::build_key_ranges_and_filters() {
 Status VOlapScanNode::start_scan(RuntimeState* state) {
     RETURN_IF_CANCELLED(state);
 
-    VLOG_CRITICAL << "Eval Const Conjuncts";
-    // 1. Eval const conjuncts to find whether eos = true
-    eval_const_conjuncts();
-
     VLOG_CRITICAL << "NormalizeConjuncts";
-    // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts 
may
-    // set eos = true
     RETURN_IF_ERROR(normalize_conjuncts());
 
-    // 1 and 2 step dispose find conjuncts set eos = true, return directly
     if (_eos) {
         return Status::OK();
     }
 
     VLOG_CRITICAL << "BuildKeyRangesAndFilters";
-    // 3.1 Using `Key Column`'s ColumnValueRange to split ScanRange to several 
`Sub ScanRange`
     RETURN_IF_ERROR(build_key_ranges_and_filters());
-    // 3.2 Function pushdown
-    if (config::enable_function_pushdown) 
RETURN_IF_ERROR(build_function_filters());
-
-    VLOG_CRITICAL << "Filter idle conjuncts";
-    // 4.1 Filter idle conjunct which already trans to olap filters
-    // this must be after build_scan_key, it will free the StringValue memory
-    remove_pushed_conjuncts(state);
 
     VLOG_CRITICAL << "StartScanThread";
-    // 5. Start multi thread to read several `Sub Sub ScanRange`
     RETURN_IF_ERROR(start_scan_thread(state));
 
     return Status::OK();
 }
 
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_predicate(ColumnValueRange<T>& range, 
SlotDescriptor* slot) {
-    // 1. Normalize InPredicate, add to ColumnValueRange
-    RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range));
-
-    // 2. Normalize NotInPredicate, add to ColumnValueRange
-    RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range));
-
-    // 3. Normalize BinaryPredicate , add to ColumnValueRange
-    RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));
-
-    // 3. Normalize BloomFilterPredicate, push down by hash join node
-    RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot));
-
-    // 4. Check whether range is empty, set _eos
-    if (range.is_empty_value_range()) _eos = true;
-
-    // 5. Add range to Column->ColumnValueRange map
-    _column_value_ranges[slot->col_name()] = range;
-
-    return Status::OK();
-}
-
-static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
+static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
     if ((slot->type().is_date_type() || slot->type().is_date_v2_type() ||
          slot->type().is_datetime_v2_type()) &&
         (expr->type().is_date_type() || expr->type().is_date_v2_type() ||
@@ -966,152 +794,83 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* 
expr) {
     return false;
 }
 
-bool VOlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
-                                                  doris::InPredicate* pred) {
-    if (Expr::type_without_cast(pred->get_child(0)) != 
TExprNodeType::SLOT_REF) {
-        // not a slot ref(column)
-        return false;
-    }
-
-    std::vector<SlotId> slot_ids;
-    if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) {
-        // not a single column predicate
-        return false;
-    }
-
-    if (slot_ids[0] != slot->id()) {
-        // predicate not related to current column
-        return false;
-    }
-
-    if (pred->get_child(0)->type().type != slot->type().type) {
-        if (!ignore_cast(slot, pred->get_child(0))) {
-            // the type of predicate not match the slot's type
-            return false;
-        }
-    }
-
-    VLOG_CRITICAL << slot->col_name() << " fixed_values add num: " << 
pred->hybrid_set()->size();
-
-    // if there are too many elements in InPredicate, exceed the limit,
-    // we will not push any condition of this column to storage engine.
-    // because too many conditions pushed down to storage engine may even
-    // slow down the query process.
-    // ATTN: This is just an experience value. You may need to try
-    // different thresholds to improve performance.
-    if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) {
-        VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << 
" exceed limit "
-                    << _max_pushdown_conditions_per_column;
-        return false;
-    }
-
-    return true;
-}
-
-std::pair<bool, void*> 
VOlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot,
-                                                                    
doris::Expr* pred, int conj_idx,
-                                                                    int 
child_idx) {
-    auto result_pair = std::make_pair<bool, void*>(false, nullptr);
-
-    // Do not get slot_ref of column, should not push_down to Storage Engine
-    if (Expr::type_without_cast(pred->get_child(child_idx)) != 
TExprNodeType::SLOT_REF) {
-        return result_pair;
-    }
-
-    std::vector<SlotId> slot_ids;
-    if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) {
-        // not a single column predicate
-        return result_pair;
-    }
-
-    if (slot_ids[0] != slot->id()) {
-        // predicate not related to current column
-        return result_pair;
-    }
-
-    if (pred->get_child(child_idx)->type().type != slot->type().type) {
-        if (!ignore_cast(slot, pred->get_child(child_idx))) {
-            // the type of predicate not match the slot's type
-            return result_pair;
+template <bool IsFixed, PrimitiveType PrimitiveType, typename 
ChangeFixedValueRangeFunc>
+Status VOlapScanNode::change_value_range(ColumnValueRange<PrimitiveType>& 
temp_range, void* value,
+                                         const ChangeFixedValueRangeFunc& func,
+                                         const std::string& fn_name, int 
slot_ref_child) {
+    if constexpr (PrimitiveType == TYPE_DATE) {
+        DateTimeValue date_value;
+        
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+        if constexpr (IsFixed) {
+            if (!date_value.check_loss_accuracy_cast_to_date()) {
+                func(temp_range,
+                     reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                             &date_value));
+            }
+        } else {
+            if (date_value.check_loss_accuracy_cast_to_date()) {
+                if (fn_name == "lt" || fn_name == "ge") {
+                    ++date_value;
+                }
+            }
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                         &date_value));
         }
-    }
-
-    Expr* expr = pred->get_child(1 - child_idx);
-    if (!expr->is_constant()) {
-        // only handle constant value
-        return result_pair;
-    }
-
-    // get value in result pair
-    result_pair = std::make_pair(
-            true, _conjunct_ctxs[conj_idx]->get_value(expr, nullptr, 
slot->type().precision,
-                                                      slot->type().scale));
-
-    return result_pair;
-}
-
-template <PrimitiveType primitive_type, typename ChangeFixedValueRangeFunc>
-Status 
VOlapScanNode::change_fixed_value_range(ColumnValueRange<primitive_type>& 
temp_range,
-                                               void* value, const 
ChangeFixedValueRangeFunc& func) {
-    switch (primitive_type) {
-    case TYPE_DATE: {
-        DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
-        // There is must return empty data in olap_scan_node,
-        // Because data value loss accuracy
-        if (!date_value.check_loss_accuracy_cast_to_date()) {
+    } else if constexpr (PrimitiveType == TYPE_DATETIME) {
+        DateTimeValue date_value;
+        
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+        if constexpr (IsFixed) {
             func(temp_range,
-                 reinterpret_cast<typename 
PrimitiveTypeTraits<primitive_type>::CppType*>(
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
                          &date_value));
+        } else {
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                         reinterpret_cast<char*>(&date_value)));
         }
-        break;
-    }
-    case TYPE_DECIMALV2:
-    case TYPE_CHAR:
-    case TYPE_VARCHAR:
-    case TYPE_HLL:
-    case TYPE_DATETIME:
-    case TYPE_DATETIMEV2:
-    case TYPE_TINYINT:
-    case TYPE_SMALLINT:
-    case TYPE_INT:
-    case TYPE_BIGINT:
-    case TYPE_LARGEINT:
-    case TYPE_DECIMAL32:
-    case TYPE_DECIMAL64:
-    case TYPE_DECIMAL128:
-    case TYPE_STRING: {
-        func(temp_range,
-             reinterpret_cast<typename 
PrimitiveTypeTraits<primitive_type>::CppType*>(value));
-        break;
-    }
-    case TYPE_BOOLEAN: {
-        bool v = *reinterpret_cast<bool*>(value);
-        func(temp_range,
-             reinterpret_cast<typename 
PrimitiveTypeTraits<primitive_type>::CppType*>(&v));
-        break;
-    }
-    case TYPE_DATEV2: {
+    } else if constexpr (PrimitiveType == TYPE_DATEV2) {
         DateV2Value<DateTimeV2ValueType> datetimev2_value =
                 *reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
-        if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
-            DateV2Value<DateV2ValueType> date_v2;
+        if constexpr (IsFixed) {
+            if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
+                DateV2Value<DateV2ValueType> date_v2;
+                date_v2.set_date_uint32(
+                        binary_cast<DateV2Value<DateTimeV2ValueType>, 
uint64_t>(datetimev2_value) >>
+                        TIME_PART_LENGTH);
+                func(temp_range, &date_v2);
+            }
+        } else {
+            doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
             date_v2.set_date_uint32(
                     binary_cast<DateV2Value<DateTimeV2ValueType>, 
uint64_t>(datetimev2_value) >>
                     TIME_PART_LENGTH);
-            if constexpr (primitive_type == PrimitiveType::TYPE_DATEV2) {
-                func(temp_range, &date_v2);
-            } else {
-                __builtin_unreachable();
+            if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
+                if (fn_name == "lt" || fn_name == "ge") {
+                    ++date_v2;
+                }
             }
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), 
&date_v2);
+        }
+    } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType 
== TYPE_CHAR) ||
+                         (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == 
TYPE_HLL) ||
+                         (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType 
== TYPE_TINYINT) ||
+                         (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == 
TYPE_INT) ||
+                         (PrimitiveType == TYPE_BIGINT) || (PrimitiveType == 
TYPE_LARGEINT) ||
+                         (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType 
== TYPE_DECIMAL64) ||
+                         (PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType 
== TYPE_STRING) ||
+                         (PrimitiveType == TYPE_BOOLEAN)) {
+        if constexpr (IsFixed) {
+            func(temp_range,
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
+        } else {
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
         }
-        break;
-    }
-    default: {
-        LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. 
[type="
-                     << primitive_type << "]";
-        return Status::InternalError("Normalize filter fail, Unsupported 
Primitive type");
-    }
+    } else {
+        static_assert(always_false_v<PrimitiveType>);
     }
+
     return Status::OK();
 }
 
@@ -1127,440 +886,6 @@ bool VOlapScanNode::is_key_column(const std::string& 
key_name) {
     return res != _olap_scan_node.key_column_name.end();
 }
 
-void VOlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
-    if (_pushed_conjuncts_index.empty() && 
_pushed_func_conjuncts_index.empty()) {
-        return;
-    }
-
-    // dispose direct conjunct first
-    std::vector<ExprContext*> new_conjunct_ctxs;
-    for (int i = 0; i < _direct_conjunct_size; ++i) {
-        if (!_pushed_conjuncts_index.empty() && 
_pushed_conjuncts_index.count(i)) {
-            _conjunct_ctxs[i]->close(state); // pushed condition, just close
-        } else if (!_pushed_func_conjuncts_index.empty() && 
_pushed_func_conjuncts_index.count(i)) {
-            _pushed_func_conjunct_ctxs.emplace_back(
-                    _conjunct_ctxs[i]); // pushed functions, need keep ctxs
-        } else {
-            new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
-        }
-    }
-    auto new_direct_conjunct_size = new_conjunct_ctxs.size();
-
-    // dispose hash join push down conjunct second
-    for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
-        if (!_pushed_conjuncts_index.empty() && 
_pushed_conjuncts_index.count(i)) {
-            _conjunct_ctxs[i]->close(state); // pushed condition, just close
-        } else if (!_pushed_func_conjuncts_index.empty() && 
_pushed_func_conjuncts_index.count(i)) {
-            _pushed_func_conjunct_ctxs.emplace_back(
-                    _conjunct_ctxs[i]); // pushed functions, need keep ctxs
-        } else {
-            new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
-        }
-    }
-
-    _conjunct_ctxs = std::move(new_conjunct_ctxs);
-    _direct_conjunct_size = new_direct_conjunct_size;
-
-    // TODO: support vbloom_filter_predicate/vbinary_predicate and merge 
unpushed predicate to _vconjunct_ctx
-    for (auto push_down_ctx : _pushed_conjuncts_index) {
-        auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx);
-        if (iter != _conjunctid_to_runtime_filter_ctxs.end()) {
-            iter->second->runtimefilter->set_push_down_profile();
-        }
-    }
-
-    // set vconjunct_ctx is empty, if all conjunct
-    if (_direct_conjunct_size == 0) {
-        if (_vconjunct_ctx_ptr != nullptr) {
-            (*_vconjunct_ctx_ptr)->close(state);
-            _vconjunct_ctx_ptr = nullptr;
-        }
-    }
-
-    // filter idle conjunct in vexpr_contexts
-    auto checker = [&](int index) { return 
_pushed_conjuncts_index.count(index); };
-    _peel_pushed_vconjunct(state, checker);
-}
-
-// Construct the ColumnValueRange for one specified column
-// It will only handle the InPredicate and eq BinaryPredicate in conjunct_ctxs.
-// It will try to push down conditions of that column as much as possible,
-// But if the number of conditions exceeds the limit, none of conditions will 
be pushed down.
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
-                                                    ColumnValueRange<T>* 
range) {
-    std::vector<uint32_t> filter_conjuncts_index;
-    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
-        // create empty range as temp range, temp range should do intersection 
on range
-        auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(
-                slot->type().precision, slot->type().scale);
-
-        // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
-        if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
-            InPredicate* pred = 
static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
-            if (!should_push_down_in_predicate(slot, pred)) {
-                continue;
-            }
-
-            // begin to push InPredicate value into ColumnValueRange
-            HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
-            while (iter->has_next()) {
-                // column in (nullptr) is always false so continue to
-                // dispose next item
-                if (nullptr == iter->get_value()) {
-                    iter->next();
-                    continue;
-                }
-                auto value = const_cast<void*>(iter->get_value());
-                RETURN_IF_ERROR(change_fixed_value_range(
-                        temp_range, value, 
ColumnValueRange<T>::add_fixed_value_range));
-                iter->next();
-            }
-
-            if (is_key_column(slot->col_name())) {
-                filter_conjuncts_index.emplace_back(conj_idx);
-            }
-            range->intersection(temp_range);
-        } // end of handle in predicate
-        // 2. Normalize eq conjuncts like 'where col = value'
-        else if (TExprNodeType::BINARY_PRED == 
_conjunct_ctxs[conj_idx]->root()->node_type() &&
-                 FILTER_IN == 
to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
-            Expr* pred = _conjunct_ctxs[conj_idx]->root();
-            DCHECK(pred->get_num_children() == 2);
-
-            for (int child_idx = 0; child_idx < 2; ++child_idx) {
-                // TODO: should use C++17 structured bindlings to refactor 
this code in the future:
-                // 'auto [should_push_down, value] = 
should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
-                // make code tidier and readabler
-                auto result_pair = should_push_down_eq_predicate(slot, pred, 
conj_idx, child_idx);
-                if (!result_pair.first) {
-                    continue;
-                }
-
-                auto value = result_pair.second;
-                // where A = nullptr should return empty result set
-                if (value != nullptr) {
-                    RETURN_IF_ERROR(change_fixed_value_range(
-                            temp_range, value, 
ColumnValueRange<T>::add_fixed_value_range));
-                }
-
-                if (is_key_column(slot->col_name())) {
-                    filter_conjuncts_index.emplace_back(conj_idx);
-                }
-                range->intersection(temp_range);
-            } // end for each binary predicate child
-        }     // end of handling eq binary predicate
-    }
-
-    // exceed limit, no conditions will be pushed down to storage engine.
-    if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) {
-        range->set_whole_value_range();
-    } else {
-        std::copy(filter_conjuncts_index.cbegin(), 
filter_conjuncts_index.cend(),
-                  std::inserter(_pushed_conjuncts_index, 
_pushed_conjuncts_index.begin()));
-    }
-    return Status::OK();
-}
-
-// Construct the ColumnValueRange for one specified column
-// It will only handle the NotInPredicate and not eq BinaryPredicate in 
conjunct_ctxs.
-// It will try to push down conditions of that column as much as possible,
-// But if the number of conditions exceeds the limit, none of conditions will 
be pushed down.
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* 
slot,
-                                                            
ColumnValueRange<T>* range) {
-    // If the conjunct of slot is fixed value, will change the fixed value set 
of column value range
-    // else add value to not in range and push down predicate directly
-    bool is_fixed_range = range->is_fixed_value_range();
-    auto not_in_range = 
ColumnValueRange<T>::create_empty_column_value_range(range->column_name());
-
-    std::vector<uint32_t> filter_conjuncts_index;
-    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
-        // 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)'
-        if (TExprOpcode::FILTER_NOT_IN == 
_conjunct_ctxs[conj_idx]->root()->op()) {
-            InPredicate* pred = 
static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
-            if (!should_push_down_in_predicate(slot, pred)) {
-                continue;
-            }
-
-            // begin to push InPredicate value into ColumnValueRange
-            auto iter = pred->hybrid_set()->begin();
-            while (iter->has_next()) {
-                // column not in (nullptr) is always true
-                if (nullptr == iter->get_value()) {
-                    continue;
-                }
-                auto value = const_cast<void*>(iter->get_value());
-                if (is_fixed_range) {
-                    RETURN_IF_ERROR(change_fixed_value_range(
-                            *range, value, 
ColumnValueRange<T>::remove_fixed_value_range));
-                } else {
-                    RETURN_IF_ERROR(change_fixed_value_range(
-                            not_in_range, value, 
ColumnValueRange<T>::add_fixed_value_range));
-                }
-                iter->next();
-            }
-
-            // only where a in ('a', 'b', nullptr) contain nullptr will
-            // clear temp_range to whole range, no need do intersection
-            if (is_key_column(slot->col_name())) {
-                filter_conjuncts_index.emplace_back(conj_idx);
-            }
-        } // end of handle not in predicate
-
-        // 2. Normalize eq conjuncts like 'where col != value'
-        if (TExprNodeType::BINARY_PRED == 
_conjunct_ctxs[conj_idx]->root()->node_type() &&
-            FILTER_NOT_IN == 
to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
-            Expr* pred = _conjunct_ctxs[conj_idx]->root();
-            DCHECK(pred->get_num_children() == 2);
-
-            for (int child_idx = 0; child_idx < 2; ++child_idx) {
-                // TODO: should use C++17 structured bindlings to refactor 
this code in the future:
-                // 'auto [should_push_down, value] = 
should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
-                // make code tidier and readabler
-                auto result_pair = should_push_down_eq_predicate(slot, pred, 
conj_idx, child_idx);
-                if (!result_pair.first) {
-                    continue;
-                }
-                auto value = result_pair.second;
-
-                if (is_fixed_range) {
-                    RETURN_IF_ERROR(change_fixed_value_range(
-                            *range, value, 
ColumnValueRange<T>::remove_fixed_value_range));
-                } else {
-                    RETURN_IF_ERROR(change_fixed_value_range(
-                            not_in_range, value, 
ColumnValueRange<T>::add_fixed_value_range));
-                }
-
-                if (is_key_column(slot->col_name())) {
-                    filter_conjuncts_index.emplace_back(conj_idx);
-                }
-            } // end for each binary predicate child
-        }     // end of handling eq binary predicate
-    }
-
-    // exceed limit, no conditions will be pushed down to storage engine.
-    if (is_fixed_range ||
-        not_in_range.get_fixed_value_size() <= 
_max_pushdown_conditions_per_column) {
-        if (!is_fixed_range) {
-            // push down not in condition to storage engine
-            not_in_range.to_in_condition(_olap_filter, false);
-        }
-        std::copy(filter_conjuncts_index.cbegin(), 
filter_conjuncts_index.cend(),
-                  std::inserter(_pushed_conjuncts_index, 
_pushed_conjuncts_index.begin()));
-    }
-    return Status::OK();
-}
-
-template <PrimitiveType T>
-bool VOlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* 
slot,
-                                                const std::string& is_null_str,
-                                                ColumnValueRange<T>* range) {
-    if (expr->node_type() != TExprNodeType::SLOT_REF) {
-        return false;
-    }
-
-    std::vector<SlotId> slot_ids;
-    if (1 != expr->get_slot_ids(&slot_ids)) {
-        return false;
-    }
-
-    if (slot_ids[0] != slot->id()) {
-        return false;
-    }
-
-    auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
-                                                                           
slot->type().scale);
-    temp_range.set_contain_null(is_null_str == "null");
-    range->intersection(temp_range);
-
-    return true;
-}
-
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
-                                                       ColumnValueRange<T>* 
range) {
-    std::vector<uint32_t> filter_conjuncts_index;
-
-    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
-        Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
-        if (TExprNodeType::BINARY_PRED != root_expr->node_type() ||
-            FILTER_IN == to_olap_filter_type(root_expr->op(), false) ||
-            FILTER_NOT_IN == to_olap_filter_type(root_expr->op(), false)) {
-            if (TExprNodeType::FUNCTION_CALL == root_expr->node_type()) {
-                std::string is_null_str;
-                // 1. dispose the where pred "A is null" and "A is not null"
-                if (root_expr->is_null_scalar_function(is_null_str) &&
-                    normalize_is_null_predicate(root_expr->get_child(0), slot, 
is_null_str,
-                                                range)) {
-                    // if column is key column should push down conjunct 
storage engine
-                    if (is_key_column(slot->col_name())) {
-                        filter_conjuncts_index.emplace_back(conj_idx);
-                    }
-                }
-            }
-            continue;
-        }
-
-        // 2. dispose the where pred "A <,<=" and "A >,>="
-        Expr* pred = _conjunct_ctxs[conj_idx]->root();
-        DCHECK(pred->get_num_children() == 2);
-
-        for (int child_idx = 0; child_idx < 2; ++child_idx) {
-            if (Expr::type_without_cast(pred->get_child(child_idx)) != 
TExprNodeType::SLOT_REF) {
-                continue;
-            }
-            if (pred->get_child(child_idx)->type().type != slot->type().type) {
-                if (!ignore_cast(slot, pred->get_child(child_idx))) {
-                    continue;
-                }
-            }
-
-            std::vector<SlotId> slot_ids;
-
-            if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) {
-                if (slot_ids[0] != slot->id()) {
-                    continue;
-                }
-
-                Expr* expr = pred->get_child(1 - child_idx);
-
-                // for case: where col_a > col_b
-                if (!expr->is_constant()) {
-                    continue;
-                }
-
-                void* value = _conjunct_ctxs[conj_idx]->get_value(
-                        expr, nullptr, slot->type().precision, 
slot->type().scale);
-                // for case: where col > null
-                if (value == nullptr) {
-                    continue;
-                }
-
-                switch (slot->type().type) {
-                case TYPE_DATE: {
-                    DateTimeValue date_value = 
*reinterpret_cast<DateTimeValue*>(value);
-                    // NOTE: Datetime may be truncated to a date column, so we 
call ++operator for date_value
-                    //  for example: '2010-01-01 00:00:01' will be truncate to 
'2010-01-01'
-                    if (date_value.check_loss_accuracy_cast_to_date()) {
-                        if (pred->op() == TExprOpcode::LT || pred->op() == 
TExprOpcode::GE) {
-                            ++date_value;
-                        }
-                    }
-                    range->add_range(to_olap_filter_type(pred->op(), 
child_idx),
-                                     *reinterpret_cast<typename 
PrimitiveTypeTraits<T>::CppType*>(
-                                             &date_value));
-                    break;
-                }
-                case TYPE_DATEV2: {
-                    DateV2Value<DateTimeV2ValueType> datetimev2_value =
-                            
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
-                    doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
-                    
date_v2.set_date_uint32(binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(
-                                                    datetimev2_value) >>
-                                            TIME_PART_LENGTH);
-                    if 
(!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
-                        if (pred->op() == TExprOpcode::LT || pred->op() == 
TExprOpcode::GE) {
-                            ++date_v2;
-                        }
-                    }
-                    if constexpr (T == PrimitiveType::TYPE_DATEV2) {
-                        range->add_range(to_olap_filter_type(pred->op(), 
child_idx), date_v2);
-                        break;
-                    } else {
-                        __builtin_unreachable();
-                    }
-                }
-                case TYPE_TINYINT:
-                case TYPE_DECIMALV2:
-                case TYPE_DECIMAL32:
-                case TYPE_DECIMAL64:
-                case TYPE_DECIMAL128:
-                case TYPE_CHAR:
-                case TYPE_VARCHAR:
-                case TYPE_HLL:
-                case TYPE_DATETIME:
-                case TYPE_DATETIMEV2:
-                case TYPE_SMALLINT:
-                case TYPE_INT:
-                case TYPE_BIGINT:
-                case TYPE_LARGEINT:
-                case TYPE_BOOLEAN:
-                case TYPE_STRING: {
-                    range->add_range(
-                            to_olap_filter_type(pred->op(), child_idx),
-                            *reinterpret_cast<typename 
PrimitiveTypeTraits<T>::CppType*>(value));
-                    break;
-                }
-
-                default: {
-                    LOG(WARNING) << "Normalize filter fail, Unsupported 
Primitive type. [type="
-                                 << expr->type() << "]";
-                    return Status::InternalError(
-                            "Normalize filter fail, Unsupported Primitive 
type");
-                }
-                }
-
-                if (is_key_column(slot->col_name())) {
-                    filter_conjuncts_index.emplace_back(conj_idx);
-                }
-
-                VLOG_CRITICAL << slot->col_name() << " op: "
-                              << 
static_cast<int>(to_olap_filter_type(pred->op(), child_idx))
-                              << " value: "
-                              << *reinterpret_cast<typename 
PrimitiveTypeTraits<T>::CppType*>(
-                                         value);
-            }
-        }
-    }
-
-    std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
-              std::inserter(_pushed_conjuncts_index, 
_pushed_conjuncts_index.begin()));
-
-    return Status::OK();
-}
-
-Status VOlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
-    std::vector<uint32_t> filter_conjuncts_index;
-
-    for (int conj_idx = _direct_conjunct_size; conj_idx < 
_conjunct_ctxs.size(); ++conj_idx) {
-        Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
-        if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue;
-
-        Expr* pred = _conjunct_ctxs[conj_idx]->root();
-        DCHECK(pred->get_num_children() == 1);
-
-        if (Expr::type_without_cast(pred->get_child(0)) != 
TExprNodeType::SLOT_REF) {
-            continue;
-        }
-        if (pred->get_child(0)->type().type != slot->type().type) {
-            if (!ignore_cast(slot, pred->get_child(0))) {
-                continue;
-            }
-        }
-
-        std::vector<SlotId> slot_ids;
-
-        if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) {
-            if (slot_ids[0] != slot->id()) {
-                continue;
-            }
-            // only key column of bloom filter will push down to storage engine
-            if (is_key_column(slot->col_name())) {
-                filter_conjuncts_index.emplace_back(conj_idx);
-                _bloom_filters_push_down.emplace_back(
-                        slot->col_name(),
-                        
(reinterpret_cast<BloomFilterPredicate*>(pred))->get_bloom_filter_func());
-            }
-        }
-    }
-
-    std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
-              std::inserter(_pushed_conjuncts_index, 
_pushed_conjuncts_index.begin()));
-
-    return Status::OK();
-}
-
 Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
     if (_scan_ranges.empty()) {
         _transfer_done = true;
@@ -1700,8 +1025,6 @@ Status VOlapScanNode::close(RuntimeState* state) {
         DCHECK(runtime_filter != nullptr);
         runtime_filter->consumer_close();
     }
-    // pushed functions close
-    Expr::close(_pushed_func_conjunct_ctxs, state);
 
     for (auto& ctx : _stale_vexpr_ctxs) {
         (*ctx)->close(state);
@@ -2019,4 +1342,533 @@ Status VOlapScanNode::get_hints(TabletSharedPtr table, 
const TPaloScanRange& sca
     return Status::OK();
 }
 
+template <bool IsNotIn>
+bool VOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred, 
VExprContext* expr_ctx) {
+    if (pred->is_not_in() != IsNotIn) {
+        return false;
+    }
+    InState* state = reinterpret_cast<InState*>(
+            expr_ctx->fn_context(pred->fn_context_index())
+                    ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+    HybridSetBase* set = state->hybrid_set.get();
+
+    // if there are too many elements in InPredicate, exceed the limit,
+    // we will not push any condition of this column to storage engine.
+    // because too many conditions pushed down to storage engine may even
+    // slow down the query process.
+    // ATTN: This is just an experience value. You may need to try
+    // different thresholds to improve performance.
+    if (set->size() > _max_pushdown_conditions_per_column) {
+        VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit 
"
+                    << _max_pushdown_conditions_per_column;
+        return false;
+    }
+    return true;
+}
+
+bool VOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* 
fn_call,
+                                                      VExprContext* expr_ctx,
+                                                      std::string* 
constant_str,
+                                                      
doris_udf::FunctionContext** fn_ctx) {
+    // Now only `like` function filters is supported to push down
+    if (fn_call->fn().name.function_name != "like") {
+        return false;
+    }
+
+    const auto& children = fn_call->children();
+    doris_udf::FunctionContext* func_cxt = 
expr_ctx->fn_context(fn_call->fn_context_index());
+    DCHECK(func_cxt != nullptr);
+    DCHECK(children.size() == 2);
+    for (size_t i = 0; i < children.size(); i++) {
+        if (VExpr::expr_without_cast(children[i])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            continue;
+        }
+        if (!children[1 - i]->is_constant()) {
+            // only handle constant value
+            return false;
+        } else {
+            DCHECK(children[1 - i]->type().is_string_type());
+            if (const ColumnConst* const_column = 
check_and_get_column<ColumnConst>(
+                        children[1 - i]->get_const_col(expr_ctx)->column_ptr)) 
{
+                *constant_str = const_column->get_data_at(0).to_string();
+            } else {
+                return false;
+            }
+        }
+    }
+    *fn_ctx = func_cxt;
+    return true;
+}
+
+bool VOlapScanNode::_should_push_down_binary_predicate(
+        VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* 
constant_val,
+        int* slot_ref_child, const std::function<bool(const std::string&)>& 
fn_checker) {
+    if (!fn_checker(fn_call->fn().name.function_name)) {
+        return false;
+    }
+
+    const auto& children = fn_call->children();
+    DCHECK(children.size() == 2);
+    for (size_t i = 0; i < children.size(); i++) {
+        if (VExpr::expr_without_cast(children[i])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            continue;
+        }
+        if (!children[1 - i]->is_constant()) {
+            // only handle constant value
+            return false;
+        } else {
+            if (const ColumnConst* const_column = 
check_and_get_column<ColumnConst>(
+                        children[1 - i]->get_const_col(expr_ctx)->column_ptr)) 
{
+                *slot_ref_child = i;
+                *constant_val = const_column->get_data_at(0);
+            } else {
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
+bool VOlapScanNode::_is_predicate_acting_on_slot(
+        VExpr* expr,
+        const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, 
VExpr**)>& checker,
+        SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
+    const VSlotRef* slot_ref = nullptr;
+    VExpr* child_contains_slot = nullptr;
+    if (!checker(expr->children(), &slot_ref, &child_contains_slot)) {
+        // not a slot ref(column)
+        return false;
+    }
+
+    auto entry = _id_to_slot_column_value_range.find(slot_ref->slot_id());
+    if (_id_to_slot_column_value_range.end() == entry) {
+        return false;
+    }
+    *slot_desc = entry->second.first;
+    DCHECK(child_contains_slot != nullptr);
+    if (child_contains_slot->type().type != (*slot_desc)->type().type) {
+        if (!ignore_cast(*slot_desc, child_contains_slot)) {
+            // the type of predicate not match the slot's type
+            return false;
+        }
+    }
+    *range = &(entry->second.second);
+    return true;
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_in_and_eq_predicate(VExpr* expr, 
VExprContext* expr_ctx,
+                                                     SlotDescriptor* slot,
+                                                     ColumnValueRange<T>& 
range, bool* push_down) {
+    auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
+                                                                           
slot->type().scale);
+    bool effect = false;
+    // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+    if (TExprNodeType::IN_PRED == expr->node_type()) {
+        VInPredicate* pred = static_cast<VInPredicate*>(expr);
+        if (!_should_push_down_in_predicate<false>(pred, expr_ctx)) {
+            return Status::OK();
+        }
+
+        // begin to push InPredicate value into ColumnValueRange
+        InState* state = reinterpret_cast<InState*>(
+                expr_ctx->fn_context(pred->fn_context_index())
+                        ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+        HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+        auto fn_name = std::string("");
+        while (iter->has_next()) {
+            // column in (nullptr) is always false so continue to
+            // dispose next item
+            if (nullptr == iter->get_value()) {
+                iter->next();
+                continue;
+            }
+            auto value = const_cast<void*>(iter->get_value());
+            RETURN_IF_ERROR(change_value_range<true>(
+                    temp_range, value, 
ColumnValueRange<T>::add_fixed_value_range, fn_name));
+            iter->next();
+        }
+
+        range.intersection(temp_range);
+        effect = true;
+    } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+        auto eq_checker = [](const std::string& fn_name) { return fn_name == 
"eq"; };
+
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
eq_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            // where A = nullptr should return empty result set
+            auto fn_name = std::string("");
+            if (value.data != nullptr) {
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    RETURN_IF_ERROR(change_value_range<true>(
+                            temp_range, reinterpret_cast<void*>(&val),
+                            ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                } else {
+                    RETURN_IF_ERROR(change_value_range<true>(
+                            temp_range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                            ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                }
+                range.intersection(temp_range);
+                effect = true;
+            }
+        }
+    }
+
+    // exceed limit, no conditions will be pushed down to storage engine.
+    if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) {
+        range.set_whole_value_range();
+    } else {
+        *push_down = effect;
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, 
VExprContext* expr_ctx,
+                                                             SlotDescriptor* 
slot,
+                                                             
ColumnValueRange<T>& range,
+                                                             bool* push_down) {
+    bool is_fixed_range = range.is_fixed_value_range();
+    auto not_in_range = 
ColumnValueRange<T>::create_empty_column_value_range(range.column_name());
+    bool effect = false;
+    // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+    if (TExprNodeType::IN_PRED == expr->node_type()) {
+        VInPredicate* pred = static_cast<VInPredicate*>(expr);
+        if (!_should_push_down_in_predicate<true>(pred, expr_ctx)) {
+            return Status::OK();
+        }
+
+        // begin to push InPredicate value into ColumnValueRange
+        InState* state = reinterpret_cast<InState*>(
+                expr_ctx->fn_context(pred->fn_context_index())
+                        ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+        HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+        auto fn_name = std::string("");
+        while (iter->has_next()) {
+            // column not in (nullptr) is always true
+            if (nullptr == iter->get_value()) {
+                continue;
+            }
+            auto value = const_cast<void*>(iter->get_value());
+            if (is_fixed_range) {
+                RETURN_IF_ERROR(change_value_range<true>(
+                        range, value, 
ColumnValueRange<T>::remove_fixed_value_range, fn_name));
+            } else {
+                RETURN_IF_ERROR(change_value_range<true>(
+                        not_in_range, value, 
ColumnValueRange<T>::add_fixed_value_range, fn_name));
+            }
+            iter->next();
+        }
+        effect = true;
+    } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+
+        auto ne_checker = [](const std::string& fn_name) { return fn_name == 
"ne"; };
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
ne_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            // where A = nullptr should return empty result set
+            if (value.data != nullptr) {
+                auto fn_name = std::string("");
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    if (is_fixed_range) {
+                        RETURN_IF_ERROR(change_value_range<true>(
+                                range, reinterpret_cast<void*>(&val),
+                                ColumnValueRange<T>::remove_fixed_value_range, 
fn_name));
+                    } else {
+                        RETURN_IF_ERROR(change_value_range<true>(
+                                not_in_range, reinterpret_cast<void*>(&val),
+                                ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                    }
+                } else {
+                    if (is_fixed_range) {
+                        RETURN_IF_ERROR(change_value_range<true>(
+                                range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                                ColumnValueRange<T>::remove_fixed_value_range, 
fn_name));
+                    } else {
+                        RETURN_IF_ERROR(change_value_range<true>(
+                                not_in_range,
+                                
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                                ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                    }
+                }
+                effect = true;
+            }
+        }
+    }
+
+    if (is_fixed_range ||
+        not_in_range.get_fixed_value_size() <= 
_max_pushdown_conditions_per_column) {
+        if (!is_fixed_range) {
+            // push down not in condition to storage engine
+            not_in_range.to_in_condition(_olap_filter, false);
+        }
+        *push_down = effect;
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* 
expr_ctx,
+                                                   SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                                   bool* push_down) {
+    if (TExprNodeType::FUNCTION_CALL == expr->node_type()) {
+        if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name 
== "is_null_pred") {
+            auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(
+                    slot->type().precision, slot->type().scale);
+            temp_range.set_contain_null(true);
+            range.intersection(temp_range);
+            *push_down = true;
+        } else if 
(reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name ==
+                   "is_not_null_pred") {
+            auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(
+                    slot->type().precision, slot->type().scale);
+            temp_range.set_contain_null(false);
+            range.intersection(temp_range);
+            *push_down = true;
+        }
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_noneq_binary_predicate(VExpr* expr, 
VExprContext* expr_ctx,
+                                                        SlotDescriptor* slot,
+                                                        ColumnValueRange<T>& 
range,
+                                                        bool* push_down) {
+    if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+
+        auto noneq_checker = [](const std::string& fn_name) {
+            return fn_name != "ne" && fn_name != "eq";
+        };
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
noneq_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            const std::string& fn_name =
+                    
reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name;
+
+            // where A = nullptr should return empty result set
+            if (value.data != nullptr) {
+                *push_down = true;
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    RETURN_IF_ERROR(change_value_range<false>(range, 
reinterpret_cast<void*>(&val),
+                                                              
ColumnValueRange<T>::add_value_range,
+                                                              fn_name, 
slot_ref_child));
+                } else {
+                    RETURN_IF_ERROR(change_value_range<false>(
+                            range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                            ColumnValueRange<T>::add_value_range, fn_name, 
slot_ref_child));
+                }
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status VOlapScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* 
expr_ctx,
+                                              SlotDescriptor* slot, bool* 
push_down) {
+    if (TExprNodeType::BLOOM_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 1);
+        _bloom_filters_push_down.emplace_back(slot->col_name(), 
expr->get_bloom_filter_func());
+        *push_down = true;
+    }
+    return Status::OK();
+}
+
+Status VOlapScanNode::_normalize_function_filters(VExpr* expr, VExprContext* 
expr_ctx,
+                                                  SlotDescriptor* slot, bool* 
push_down) {
+    if (!config::enable_function_pushdown) {
+        return Status::OK();
+    }
+    bool opposite = false;
+    VExpr* fn_expr = expr;
+    if (TExprNodeType::COMPOUND_PRED == expr->node_type() &&
+        expr->fn().name.function_name == "not") {
+        fn_expr = fn_expr->children()[0];
+        opposite = true;
+    }
+
+    if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) {
+        doris_udf::FunctionContext* fn_ctx = nullptr;
+        std::string str;
+        if 
(_should_push_down_function_filter(reinterpret_cast<VectorizedFnCall*>(fn_expr),
+                                              expr_ctx, &str, &fn_ctx)) {
+            std::string col = slot->col_name();
+            StringVal val(reinterpret_cast<uint8_t*>(str.data()), str.size());
+            _push_down_functions.emplace_back(opposite, col, fn_ctx, val);
+            *push_down = true;
+        }
+    }
+    return Status::OK();
+}
+
+void VOlapScanNode::eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, 
bool* push_down) {
+    char* constant_val = nullptr;
+    if (vexpr->is_constant()) {
+        if (const ColumnConst* const_column =
+                    
check_and_get_column<ColumnConst>(vexpr->get_const_col(expr_ctx)->column_ptr)) {
+            constant_val = 
const_cast<char*>(const_column->get_data_at(0).data);
+            if (constant_val == nullptr || 
*reinterpret_cast<bool*>(constant_val) == false) {
+                *push_down = true;
+                _eos = true;
+            }
+        } else {
+            LOG(WARNING) << "Expr[" << vexpr->debug_string()
+                         << "] is a constant but doesn't contain a const 
column!";
+        }
+    }
+}
+
+VExpr* VOlapScanNode::_normalize_predicate(RuntimeState* state, VExpr* 
conjunct_expr_root) {
+    static constexpr auto is_leaf = [](VExpr* expr) { return 
!expr->is_and_expr(); };
+    auto in_predicate_checker = [](const std::vector<VExpr*>& children, const 
VSlotRef** slot,
+                                   VExpr** child_contains_slot) {
+        if (children.empty() ||
+            VExpr::expr_without_cast(children[0])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            return false;
+        }
+        *slot = reinterpret_cast<const 
VSlotRef*>(VExpr::expr_without_cast(children[0]));
+        *child_contains_slot = children[0];
+        return true;
+    };
+    auto eq_predicate_checker = [](const std::vector<VExpr*>& children, const 
VSlotRef** slot,
+                                   VExpr** child_contains_slot) {
+        for (const VExpr* child : children) {
+            if (VExpr::expr_without_cast(child)->node_type() != 
TExprNodeType::SLOT_REF) {
+                // not a slot ref(column)
+                continue;
+            }
+            *slot = reinterpret_cast<const 
VSlotRef*>(VExpr::expr_without_cast(child));
+            *child_contains_slot = const_cast<VExpr*>(child);
+            return true;
+        }
+        return false;
+    };
+
+    if (conjunct_expr_root != nullptr) {
+        if (is_leaf(conjunct_expr_root)) {
+            auto impl = conjunct_expr_root->get_impl();
+            VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : 
conjunct_expr_root;
+            SlotDescriptor* slot;
+            ColumnValueRangeType* range = nullptr;
+            bool push_down = false;
+            eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()), 
&push_down);
+            if (!push_down &&
+                (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, 
&slot, &range) ||
+                 _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, 
&slot, &range))) {
+                std::visit(
+                        [&](auto& value_range) {
+                            RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            
RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            if (is_key_column(slot->col_name())) {
+                                RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
+                                        cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, &push_down));
+                                
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
+                                        cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, &push_down));
+                            }
+                        },
+                        *range);
+            }
+            if (push_down && is_key_column(slot->col_name())) {
+                return nullptr;
+            } else {
+                return conjunct_expr_root;
+            }
+        } else {
+            VExpr* left_child = _normalize_predicate(state, 
conjunct_expr_root->children()[0]);
+            VExpr* right_child = _normalize_predicate(state, 
conjunct_expr_root->children()[1]);
+
+            if (left_child != nullptr && right_child != nullptr) {
+                conjunct_expr_root->set_children({left_child, right_child});
+                return conjunct_expr_root;
+            } else {
+                // here only close the and expr self, do not close the child
+                conjunct_expr_root->set_children({});
+                conjunct_expr_root->close(state, *_vconjunct_ctx_ptr,
+                                          
(*_vconjunct_ctx_ptr)->get_function_state_scope());
+            }
+
+            // here do not close Expr* now
+            return left_child != nullptr ? left_child : right_child;
+        }
+    }
+    return conjunct_expr_root;
+}
+
+Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, 
std::vector<VExpr*>& vexprs) {
+    if (!vexprs.empty()) {
+        auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : 
vexprs[0];
+        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+            if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
+                continue;
+            }
+            TFunction fn;
+            TFunctionName fn_name;
+            fn_name.__set_db_name("");
+            fn_name.__set_function_name("and");
+            fn.__set_name(fn_name);
+            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+            std::vector<TTypeDesc> arg_types;
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            fn.__set_arg_types(arg_types);
+            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            fn.__set_has_var_args(false);
+
+            TExprNode texpr_node;
+            
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+            texpr_node.__set_fn(fn);
+            texpr_node.__set_is_nullable(last_expr->is_nullable() || 
vexprs[j]->is_nullable());
+            VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+            new_node->add_child(last_expr);
+            DCHECK((vexprs[j])->get_impl() != nullptr);
+            new_node->add_child(vexprs[j]);
+            last_expr = new_node;
+            _rf_vexpr_set.insert(vexprs[j]);
+        }
+        auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+        if (_vconjunct_ctx_ptr) {
+            (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
+        }
+        RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc()));
+        RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state));
+        if (_vconjunct_ctx_ptr) {
+            (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+            _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+        }
+        _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+        *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+    }
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scan_node.h 
b/be/src/vec/exec/volap_scan_node.h
index 4ead17f9b3..e047f34c10 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -26,6 +26,9 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "olap/tablet.h"
 #include "util/progress_updater.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/exprs/vslot_ref.h"
 
 namespace doris {
 class ObjectPool;
@@ -63,40 +66,15 @@ private:
     // In order to ensure the accuracy of the query result
     // only key column conjuncts will be remove as idle conjunct
     bool is_key_column(const std::string& key_name);
-    void remove_pushed_conjuncts(RuntimeState* state);
 
     Status start_scan(RuntimeState* state);
-    void eval_const_conjuncts();
     Status normalize_conjuncts();
     Status build_key_ranges_and_filters();
-    Status build_function_filters();
 
-    template <PrimitiveType T>
-    Status normalize_predicate(ColumnValueRange<T>& range, SlotDescriptor* 
slot);
-
-    template <PrimitiveType T>
-    Status normalize_in_and_eq_predicate(SlotDescriptor* slot, 
ColumnValueRange<T>* range);
-
-    template <PrimitiveType T>
-    Status normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, 
ColumnValueRange<T>* range);
-
-    template <PrimitiveType T>
-    Status normalize_noneq_binary_predicate(SlotDescriptor* slot, 
ColumnValueRange<T>* range);
-
-    Status normalize_bloom_filter_predicate(SlotDescriptor* slot);
-
-    template <PrimitiveType T>
-    static bool normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
-                                            const std::string& is_null_str,
-                                            ColumnValueRange<T>* range);
-    bool should_push_down_in_predicate(SlotDescriptor* slot, InPredicate* 
in_pred);
-
-    template <PrimitiveType T, typename ChangeFixedValueRangeFunc>
-    static Status change_fixed_value_range(ColumnValueRange<T>& range, void* 
value,
-                                           const ChangeFixedValueRangeFunc& 
func);
-
-    std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, 
Expr* pred,
-                                                         int conj_idx, int 
child_idx);
+    template <bool IsFixed, PrimitiveType PrimitiveType, typename 
ChangeFixedValueRangeFunc>
+    static Status change_value_range(ColumnValueRange<PrimitiveType>& range, 
void* value,
+                                     const ChangeFixedValueRangeFunc& func,
+                                     const std::string& fn_name, int 
slot_ref_child = -1);
 
     void transfer_thread(RuntimeState* state);
     void scanner_thread(VOlapScanner* scanner);
@@ -114,6 +92,54 @@ private:
         return _runtime_filter_descs;
     }
 
+    template <PrimitiveType T>
+    Status _normalize_in_and_eq_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                          SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                          bool* push_down);
+    template <PrimitiveType T>
+    Status _normalize_not_in_and_not_eq_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                                  SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                                  bool* push_down);
+
+    template <PrimitiveType T>
+    Status _normalize_noneq_binary_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                             SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                             bool* push_down);
+
+    template <PrimitiveType T>
+    Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* 
expr_ctx,
+                                        SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                        bool* push_down);
+
+    Status _normalize_bloom_filter(vectorized::VExpr* expr, VExprContext* 
expr_ctx,
+                                   SlotDescriptor* slot, bool* push_down);
+
+    void eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* 
push_down);
+
+    vectorized::VExpr* _normalize_predicate(RuntimeState* state,
+                                            vectorized::VExpr* 
conjunct_expr_root);
+
+    Status _normalize_function_filters(vectorized::VExpr* expr, VExprContext* 
expr_ctx,
+                                       SlotDescriptor* slot, bool* push_down);
+
+    template <bool IsNotIn>
+    bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext* 
expr_ctx);
+
+    bool _is_predicate_acting_on_slot(VExpr* expr,
+                                      const std::function<bool(const 
std::vector<VExpr*>&,
+                                                               const 
VSlotRef**, VExpr**)>& checker,
+                                      SlotDescriptor** slot_desc, 
ColumnValueRangeType** range);
+
+    bool _should_push_down_binary_predicate(
+            VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* 
constant_val,
+            int* slot_ref_child, const std::function<bool(const 
std::string&)>& fn_checker);
+
+    bool _should_push_down_function_filter(VectorizedFnCall* fn_call, 
VExprContext* expr_ctx,
+                                           std::string* constant_str,
+                                           doris_udf::FunctionContext** 
fn_ctx);
+
+    Status _append_rf_into_conjuncts(RuntimeState* state, std::vector<VExpr*>& 
vexprs);
+
     // Tuple id resolved in prepare() to set _tuple_desc;
     TupleId _tuple_id;
     // doris scan node used to scan doris
@@ -124,9 +150,6 @@ private:
     int _tuple_idx;
     // string slots
     std::vector<SlotDescriptor*> _string_slots;
-    // conjunct's index which already be push down storage engine
-    // should be remove in olap_scan_node, no need check this conjunct again
-    std::set<uint32_t> _pushed_conjuncts_index;
     // collection slots
     std::vector<SlotDescriptor*> _collection_slots;
 
@@ -149,11 +172,6 @@ private:
     // push down functions to storage engine
     // only support scalar functions, now just support like / not like
     std::vector<FunctionFilter> _push_down_functions;
-    // functions conjunct's index which already be push down storage engine
-    std::set<uint32_t> _pushed_func_conjuncts_index;
-    // need keep these conjunct to the end of scan node,
-    // since some memory referenced by pushed function filters
-    std::vector<ExprContext*> _pushed_func_conjunct_ctxs;
 
     // Pool for storing allocated scanner objects.  We don't want to use the
     // runtime pool to ensure that the scanner objects are deleted before this
@@ -183,7 +201,6 @@ private:
     // Used in Scan thread to ensure thread-safe
     std::atomic_bool _scanner_done;
     std::atomic_bool _transfer_done;
-    size_t _direct_conjunct_size;
 
     int _total_assign_num;
     int _nice;
@@ -321,6 +338,8 @@ private:
 
     phmap::flat_hash_set<VExpr*> _rf_vexpr_set;
     std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
+    phmap::flat_hash_map<int, std::pair<SlotDescriptor*, ColumnValueRangeType>>
+            _id_to_slot_column_value_range;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index c032aee301..6ce3182c1c 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -122,10 +122,6 @@ Status VOlapScanner::open() {
     SCOPED_TIMER(_parent->_reader_init_timer);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
 
-    if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) {
-        _use_pushdown_conjuncts = true;
-    }
-
     _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), 
false);
 
     auto res = _tablet_reader->init(_tablet_reader_params);
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 1ea8639c6c..7584eb47c9 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -83,10 +83,6 @@ public:
 
     int64_t update_wait_worker_timer() const { return _watcher.elapsed_time(); 
}
 
-    void set_use_pushdown_conjuncts(bool has_pushdown_conjuncts) {
-        _use_pushdown_conjuncts = has_pushdown_conjuncts;
-    }
-
     std::vector<bool>* mutable_runtime_filter_marks() { return 
&_runtime_filter_marks; }
 
     TabletStorageType get_storage_type();
@@ -117,7 +113,6 @@ private:
     bool _aggregation;
     bool _need_agg_finalize = true;
     bool _has_update_counter = false;
-    bool _use_pushdown_conjuncts = false;
 
     TabletReader::ReaderParams _tablet_reader_params;
     std::unique_ptr<TabletReader> _tablet_reader;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index 6f73b2c69d..c708efdda0 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -90,7 +90,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* 
block, int* result
 const std::string& VBloomPredicate::expr_name() const {
     return _expr_name;
 }
-void VBloomPredicate::set_filter(std::unique_ptr<IBloomFilterFuncBase>& 
filter) {
-    _filter.reset(filter.release());
+void VBloomPredicate::set_filter(std::shared_ptr<IBloomFilterFuncBase>& 
filter) {
+    _filter = filter;
 }
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h 
b/be/src/vec/exprs/vbloom_predicate.h
index f715b9e40e..0f00d6ca55 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -37,7 +37,9 @@ public:
         return pool->add(new VBloomPredicate(*this));
     }
     const std::string& expr_name() const override;
-    void set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter);
+    void set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter);
+
+    std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const 
override { return _filter; }
 
 private:
     std::shared_ptr<IBloomFilterFuncBase> _filter;
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 909feb324f..d493a87a65 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -354,7 +354,7 @@ Status VExpr::init_function_context(VExprContext* context,
 
 void VExpr::close_function_context(VExprContext* context, 
FunctionContext::FunctionStateScope scope,
                                    const FunctionBasePtr& function) const {
-    if (_fn_context_index != -1) {
+    if (_fn_context_index != -1 && !context->_stale) {
         FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
         function->close(fn_ctx, FunctionContext::THREAD_LOCAL);
         if (scope == FunctionContext::FRAGMENT_LOCAL) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b6921753e8..9d2d4ce64b 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "gen_cpp/Exprs_types.h"
 #include "runtime/types.h"
 #include "udf/udf_internal.h"
@@ -137,6 +138,25 @@ public:
     /// expr.
     virtual ColumnPtrWrapper* get_const_col(VExprContext* context);
 
+    int fn_context_index() { return _fn_context_index; };
+
+    static const VExpr* expr_without_cast(const VExpr* expr) {
+        if (expr->node_type() == doris::TExprNodeType::CAST_EXPR) {
+            return expr_without_cast(expr->_children[0]);
+        }
+        return expr;
+    }
+
+    // If this expr is a RuntimeFilterWrapper, this method will return an 
underlying rf expression
+    virtual const VExpr* get_impl() const { return nullptr; }
+
+    // If this expr is a BloomPredicate, this method will return a 
BloomFilterFunc
+    virtual std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() 
const {
+        LOG(FATAL) << "Method 'get_bloom_filter_func()' is not supported in 
expression: "
+                   << this->debug_string();
+        return nullptr;
+    }
+
 protected:
     /// Simple debug string that provides no expr subclass-specific information
     std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index dfb4223741..da4a3a96eb 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -28,7 +28,8 @@ VExprContext::VExprContext(VExpr* expr)
           _prepared(false),
           _opened(false),
           _closed(false),
-          _last_result_column_id(-1) {}
+          _last_result_column_id(-1),
+          _stale(false) {}
 
 VExprContext::~VExprContext() {
     DCHECK(!_prepared || _closed);
@@ -98,6 +99,12 @@ doris::Status VExprContext::clone(RuntimeState* state, 
VExprContext** new_ctx) {
     return _root->open(state, *new_ctx, FunctionContext::THREAD_LOCAL);
 }
 
+void VExprContext::clone_fn_contexts(VExprContext* other) {
+    for (auto& _fn_context : _fn_contexts) {
+        
other->_fn_contexts.push_back(_fn_context->impl()->clone(other->_pool.get()));
+    }
+}
+
 int VExprContext::register_func(RuntimeState* state, const 
FunctionContext::TypeDesc& return_type,
                                 const std::vector<FunctionContext::TypeDesc>& 
arg_types,
                                 int varargs_buffer_size) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index fb31925136..b454833ed3 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -69,6 +69,13 @@ public:
         return _is_clone ? FunctionContext::THREAD_LOCAL : 
FunctionContext::FRAGMENT_LOCAL;
     }
 
+    void clone_fn_contexts(VExprContext* other);
+
+    void mark_as_stale() {
+        DCHECK(!_stale);
+        _stale = true;
+    }
+
 private:
     friend class VExpr;
 
@@ -91,5 +98,7 @@ private:
     std::unique_ptr<MemPool> _pool;
 
     int _last_result_column_id;
+
+    bool _stale;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 64cc9acefa..6b2264d40a 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -41,6 +41,10 @@ public:
 
     virtual std::string debug_string() const override;
 
+    const FunctionBasePtr function() { return _function; };
+
+    const bool is_not_in() { return _is_not_in; };
+
 private:
     FunctionBasePtr _function;
     std::string _expr_name;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 755d243fd8..91a6bdbcac 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -44,6 +44,8 @@ public:
         return _impl->get_const_col(context);
     }
 
+    const VExpr* get_impl() const override { return _impl; }
+
 private:
     VExpr* _impl;
 
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 1bc78a4c5c..0f8540fc9e 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -41,6 +41,8 @@ public:
 
     const int column_id() const { return _column_id; }
 
+    const int slot_id() const { return _slot_id; }
+
 private:
     FunctionPtr _function;
     int _slot_id;
diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.cpp
index 4e5877a6ec..b0bc1d137f 100644
--- a/be/src/vec/functions/in.cpp
+++ b/be/src/vec/functions/in.cpp
@@ -16,164 +16,12 @@
 // under the License.
 // This file is copied from
 
-#include <fmt/format.h>
+#include <vec/functions/in.h>
 
-#include "exprs/create_predicate_function.h"
-#include "vec/columns/column_const.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_set.h"
-#include "vec/columns/columns_number.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/functions/function.h"
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
 
-struct InState {
-    bool use_set = true;
-
-    // only use in null in set
-    bool null_in_set = false;
-    std::unique_ptr<HybridSetBase> hybrid_set;
-};
-
-template <bool negative>
-class FunctionIn : public IFunction {
-public:
-    static constexpr auto name = negative ? "not_in" : "in";
-
-    static FunctionPtr create() { return std::make_shared<FunctionIn>(); }
-
-    String get_name() const override { return name; }
-
-    bool is_variadic() const override { return true; }
-
-    size_t get_number_of_arguments() const override { return 0; }
-
-    DataTypePtr get_return_type_impl(const DataTypes& args) const override {
-        for (const auto& arg : args) {
-            if (arg->is_nullable()) return 
make_nullable(std::make_shared<DataTypeUInt8>());
-        }
-        return std::make_shared<DataTypeUInt8>();
-    }
-
-    bool use_default_implementation_for_nulls() const override { return false; 
}
-
-    Status prepare(FunctionContext* context, 
FunctionContext::FunctionStateScope scope) override {
-        if (scope == FunctionContext::THREAD_LOCAL) {
-            return Status::OK();
-        }
-        auto* state = new InState();
-        context->set_function_state(scope, state);
-        state->hybrid_set.reset(
-                
vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
-
-        DCHECK(context->get_num_args() >= 1);
-        for (int i = 1; i < context->get_num_args(); ++i) {
-            const auto& const_column_ptr = context->get_constant_col(i);
-            if (const_column_ptr != nullptr) {
-                auto const_data = const_column_ptr->column_ptr->get_data_at(0);
-                if (const_data.data == nullptr) {
-                    state->null_in_set = true;
-                } else {
-                    state->hybrid_set->insert((void*)const_data.data, 
const_data.size);
-                }
-            } else {
-                state->use_set = false;
-                break;
-            }
-        }
-        return Status::OK();
-    }
-
-    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                        size_t result, size_t input_rows_count) override {
-        auto in_state = reinterpret_cast<InState*>(
-                context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
-        if (!in_state) {
-            return Status::RuntimeError("funciton context for function '{}' 
must have Set;",
-                                        get_name());
-        }
-        auto res = ColumnUInt8::create();
-        ColumnUInt8::Container& vec_res = res->get_data();
-        vec_res.resize(input_rows_count);
-
-        ColumnUInt8::MutablePtr col_null_map_to;
-        col_null_map_to = ColumnUInt8::create(input_rows_count);
-        auto& vec_null_map_to = col_null_map_to->get_data();
-
-        /// First argument may be a single column.
-        const ColumnWithTypeAndName& left_arg = 
block.get_by_position(arguments[0]);
-        auto materialized_column = 
left_arg.column->convert_to_full_column_if_const();
-
-        if (in_state->use_set) {
-            for (size_t i = 0; i < input_rows_count; ++i) {
-                const auto& ref_data = materialized_column->get_data_at(i);
-                if (ref_data.data) {
-                    vec_res[i] = negative ^
-                                 
in_state->hybrid_set->find((void*)ref_data.data, ref_data.size);
-                    if (in_state->null_in_set) {
-                        vec_null_map_to[i] = negative == vec_res[i];
-                    } else {
-                        vec_null_map_to[i] = false;
-                    }
-                } else {
-                    vec_null_map_to[i] = true;
-                }
-            }
-        } else {
-            std::vector<ColumnPtr> set_columns;
-            for (int i = 1; i < arguments.size(); ++i) {
-                
set_columns.emplace_back(block.get_by_position(arguments[i]).column);
-            }
-
-            for (size_t i = 0; i < input_rows_count; ++i) {
-                const auto& ref_data = materialized_column->get_data_at(i);
-                if (ref_data.data == nullptr) {
-                    vec_null_map_to[i] = true;
-                    continue;
-                }
-
-                std::unique_ptr<HybridSetBase> hybrid_set(
-                        
vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
-                bool null_in_set = false;
-
-                for (const auto& set_column : set_columns) {
-                    auto set_data = set_column->get_data_at(i);
-                    if (set_data.data == nullptr)
-                        null_in_set = true;
-                    else
-                        hybrid_set->insert((void*)(set_data.data), 
set_data.size);
-                }
-                vec_res[i] = negative ^ hybrid_set->find((void*)ref_data.data, 
ref_data.size);
-                if (null_in_set) {
-                    vec_null_map_to[i] = negative == vec_res[i];
-                } else {
-                    vec_null_map_to[i] = false;
-                }
-            }
-        }
-
-        if (block.get_by_position(result).type->is_nullable()) {
-            block.replace_by_position(
-                    result, ColumnNullable::create(std::move(res), 
std::move(col_null_map_to)));
-        } else {
-            block.replace_by_position(result, std::move(res));
-        }
-
-        return Status::OK();
-    }
-
-    Status close(FunctionContext* context, FunctionContext::FunctionStateScope 
scope) override {
-        if (scope == FunctionContext::FRAGMENT_LOCAL) {
-            delete reinterpret_cast<InState*>(
-                    
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
-        }
-        return Status::OK();
-    }
-};
-
 void register_function_in(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionIn<false>>();
     factory.register_function<FunctionIn<true>>();
diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.h
similarity index 96%
copy from be/src/vec/functions/in.cpp
copy to be/src/vec/functions/in.h
index 4e5877a6ec..d10661fa48 100644
--- a/be/src/vec/functions/in.cpp
+++ b/be/src/vec/functions/in.h
@@ -16,6 +16,8 @@
 // under the License.
 // This file is copied from
 
+#pragma once
+
 #include <fmt/format.h>
 
 #include "exprs/create_predicate_function.h"
@@ -26,7 +28,6 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/functions/function.h"
-#include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
 
@@ -174,9 +175,4 @@ public:
     }
 };
 
-void register_function_in(SimpleFunctionFactory& factory) {
-    factory.register_function<FunctionIn<false>>();
-    factory.register_function<FunctionIn<true>>();
-}
-
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to