This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 8bf2dae26cf [refactor](parquet) Use column predicates to do filtering 
(#56904) (#59619)
8bf2dae26cf is described below

commit 8bf2dae26cf11de9ecd97fa6e7fa1dc145a5d462
Author: Gabriel <[email protected]>
AuthorDate: Thu Jan 8 11:15:38 2026 +0800

    [refactor](parquet) Use column predicates to do filtering (#56904) (#59619)
    
    pick #56739 #56904
---
 be/src/olap/block_column_predicate.h               |   29 +
 be/src/olap/column_predicate.h                     |   10 +
 be/src/olap/comparison_predicate.h                 |   47 +
 be/src/olap/delete_handler.cpp                     |    7 +-
 be/src/olap/in_list_predicate.h                    |   73 +-
 be/src/olap/null_predicate.h                       |   11 +
 be/src/olap/predicate_creator.h                    |  124 +--
 be/src/olap/tablet_reader.cpp                      |    3 +-
 be/src/runtime/runtime_predicate.cpp               |    6 +-
 be/src/runtime/runtime_predicate.h                 |    2 +-
 be/src/vec/core/field.cpp                          |   70 ++
 be/src/vec/core/field.h                            |    1 +
 be/src/vec/data_types/data_type_factory.cpp        |   10 +-
 be/src/vec/data_types/data_type_factory.hpp        |    5 +-
 be/src/vec/exec/format/generic_reader.cpp          |  252 +++++
 be/src/vec/exec/format/generic_reader.h            |   26 +
 be/src/vec/exec/format/parquet/parquet_pred_cmp.h  |   64 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  272 ++----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   24 +-
 be/src/vec/runtime/vparquet_transformer.cpp        |   21 -
 be/src/vec/runtime/vparquet_transformer.h          |   17 +-
 be/test/olap/block_column_predicate_test.cpp       | 1005 ++++++++++++++++++++
 .../vec/exec/format/parquet/parquet_expr_test.cpp  |   70 +-
 23 files changed, 1694 insertions(+), 455 deletions(-)

diff --git a/be/src/olap/block_column_predicate.h 
b/be/src/olap/block_column_predicate.h
index 1c4507f1496..a8728a768fb 100644
--- a/be/src/olap/block_column_predicate.h
+++ b/be/src/olap/block_column_predicate.h
@@ -33,6 +33,7 @@
 #include "olap/column_predicate.h"
 #include "olap/olap_common.h"
 #include "vec/columns/column.h"
+#include "vec/exec/format/parquet/parquet_pred_cmp.h"
 
 namespace roaring {
 class Roaring;
@@ -79,6 +80,10 @@ public:
         throw Exception(Status::FatalError("should not reach here"));
     }
 
+    virtual bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* 
statistic) const {
+        throw Exception(Status::FatalError("should not reach here"));
+    }
+
     virtual bool evaluate_and(const segment_v2::BloomFilter* bf) const {
         throw Exception(Status::FatalError("should not reach here"));
     }
@@ -117,6 +122,9 @@ public:
                       bool* flags) const override;
     bool support_zonemap() const override { return 
_predicate->support_zonemap(); }
     bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override;
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        return _predicate->evaluate_and(statistic);
+    }
     bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
     bool evaluate_and(const StringRef* dict_words, const size_t dict_num) 
const override;
     void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, 
uint16_t selected_size,
@@ -180,6 +188,18 @@ public:
                       bool* flags) const override;
     void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, 
uint16_t selected_size,
                      bool* flags) const override;
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (num_of_column_predicate() == 1) {
+            return _block_column_predicate_vec[0]->evaluate_and(statistic);
+        } else {
+            for (int i = 0; i < num_of_column_predicate(); ++i) {
+                if (_block_column_predicate_vec[i]->evaluate_and(statistic)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
 
     // note(wb) we didnt't implement evaluate_vec method here, because storage 
layer only support AND predicate now;
 };
@@ -203,6 +223,15 @@ public:
 
     bool evaluate_and(const StringRef* dict_words, const size_t dict_num) 
const override;
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        for (auto& block_column_predicate : _block_column_predicate_vec) {
+            if (!block_column_predicate->evaluate_and(statistic)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     bool can_do_bloom_filter(bool ngram) const override {
         for (auto& pred : _block_column_predicate_vec) {
             if (!pred->can_do_bloom_filter(ngram)) {
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 349c72d71d7..88328ffc080 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -27,6 +27,7 @@
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
 #include "vec/columns/column.h"
+#include "vec/exec/format/parquet/parquet_pred_cmp.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 
 using namespace doris::segment_v2;
@@ -222,6 +223,15 @@ public:
 
     virtual bool can_do_bloom_filter(bool ngram) const { return false; }
 
+    /**
+     * Figure out whether this page is matched partially or completely.
+     */
+    virtual bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* 
statistic) const {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "ParquetPredicate is not supported by this 
predicate!");
+        return true;
+    }
+
     // used to evaluate pre read column in lazy materialization
     // now only support integer/float
     // a vectorized eval way
diff --git a/be/src/olap/comparison_predicate.h 
b/be/src/olap/comparison_predicate.h
index 55090188b35..a04340a66b8 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -142,6 +142,53 @@ public:
         }
     }
 
+    /**
+     * To figure out whether this page is matched partially or completely.
+     *
+     * 1. EQ: if `_value` belongs to the interval [min, max], return true to 
further compute each value in this page.
+     * 2. NE: return true to further compute each value in this page if some 
values not equal to `_value`.
+     * 3. LT|LE: if `_value` is greater than min, return true to further 
compute each value in this page.
+     * 4. GT|GE: if `_value` is less than max, return true to further compute 
each value in this page.
+     */
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        vectorized::Field min_field;
+        vectorized::Field max_field;
+        if (!vectorized::ParquetPredicate::get_min_max_value(
+                     statistic->col_schema, statistic->encoded_min_value,
+                     statistic->encoded_max_value, *statistic->ctz, 
&min_field, &max_field)
+                     .ok()) {
+            return true;
+        };
+        T min_value;
+        T max_value;
+        if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
+            min_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)min_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+            max_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)max_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+        } else {
+            min_value = min_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+            max_value = max_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+        }
+
+        if constexpr (PT == PredicateType::EQ) {
+            return Compare::less_equal(min_value, _value) &&
+                   Compare::greater_equal(max_value, _value);
+        } else if constexpr (PT == PredicateType::NE) {
+            return !Compare::equal(min_value, _value) || 
!Compare::equal(max_value, _value);
+        } else if constexpr (PT == PredicateType::LT || PT == 
PredicateType::LE) {
+            return Compare::less_equal(min_value, _value);
+        } else {
+            static_assert(PT == PredicateType::GT || PT == PredicateType::GE);
+            return Compare::greater_equal(max_value, _value);
+        }
+    }
+
     bool is_always_true(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override {
         if (statistic.first->is_null() || statistic.second->is_null()) {
             return false;
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 24ff8c27f75..11f89eff0c0 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -370,7 +370,8 @@ Status DeleteHandler::_parse_column_pred(TabletSchemaSPtr 
complete_schema,
         condition.__set_column_unique_id(col_unique_id);
         const auto& column = complete_schema->column_by_uid(col_unique_id);
         uint32_t index = complete_schema->field_index(col_unique_id);
-        auto* predicate = parse_to_predicate(column, index, condition, 
_predicate_arena, true);
+        auto* predicate =
+                parse_to_predicate(column.get_vec_type(), index, condition, 
_predicate_arena, true);
         if (predicate != nullptr) {
             delete_conditions->column_predicate_vec.push_back(predicate);
         }
@@ -440,8 +441,8 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
             }
             const auto& column = tablet_schema->column_by_uid(col_unique_id);
             uint32_t index = tablet_schema->field_index(col_unique_id);
-            temp.column_predicate_vec.push_back(
-                    parse_to_predicate(column, index, condition, 
_predicate_arena, true));
+            temp.column_predicate_vec.push_back(parse_to_predicate(
+                    column.get_vec_type(), index, condition, _predicate_arena, 
true));
         }
 
         _del_conds.emplace_back(std::move(temp));
diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h
index c8714773232..976cb6f9e26 100644
--- a/be/src/olap/in_list_predicate.h
+++ b/be/src/olap/in_list_predicate.h
@@ -36,6 +36,7 @@
 #include "vec/columns/column_dictionary.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
 
 // for uint24_t
 template <>
@@ -67,8 +68,8 @@ public:
     using T = typename PrimitiveTypeTraits<Type>::CppType;
     template <typename ConditionType, typename ConvertFunc>
     InListPredicateBase(uint32_t column_id, const ConditionType& conditions,
-                        const ConvertFunc& convert, bool is_opposite, const 
TabletColumn* col,
-                        vectorized::Arena& arena)
+                        const ConvertFunc& convert, bool is_opposite,
+                        const vectorized::DataTypePtr& data_type, 
vectorized::Arena& arena)
             : ColumnPredicate(column_id, is_opposite),
               _min_value(type_limit<T>::max()),
               _max_value(type_limit<T>::min()) {
@@ -76,10 +77,10 @@ public:
         for (const auto& condition : conditions) {
             T tmp;
             if constexpr (Type == TYPE_STRING || Type == TYPE_CHAR) {
-                tmp = convert(*col, condition, arena);
+                tmp = convert(data_type, condition, arena);
             } else if constexpr (Type == TYPE_DECIMAL32 || Type == 
TYPE_DECIMAL64 ||
                                  Type == TYPE_DECIMAL128I || Type == 
TYPE_DECIMAL256) {
-                tmp = convert(*col, condition);
+                tmp = convert(data_type, condition);
             } else {
                 tmp = convert(condition);
             }
@@ -245,6 +246,42 @@ public:
         }
     }
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        vectorized::Field min_field;
+        vectorized::Field max_field;
+        if (!vectorized::ParquetPredicate::get_min_max_value(
+                     statistic->col_schema, statistic->encoded_min_value,
+                     statistic->encoded_max_value, *statistic->ctz, 
&min_field, &max_field)
+                     .ok()) {
+            return true;
+        };
+        T min_value;
+        T max_value;
+        if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
+            min_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)min_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+            max_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)max_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+        } else {
+            min_value = min_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+            max_value = max_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+        }
+
+        if constexpr (PT == PredicateType::IN_LIST) {
+            return (Compare::less_equal(min_value, _max_value) &&
+                    Compare::greater_equal(max_value, _min_value)) ||
+                   (Compare::greater_equal(max_value, _min_value) &&
+                    Compare::less_equal(min_value, _max_value));
+        } else {
+            return true;
+        }
+    }
+
     bool evaluate_and(const StringRef* dict_words, const size_t count) const 
override {
         for (size_t i = 0; i != count; ++i) {
             const auto found = _values->find(dict_words[i].data, 
dict_words[i].size) ^ _opposite;
@@ -548,7 +585,8 @@ template <PrimitiveType Type, PredicateType PT, typename 
ConditionType, typename
           size_t N = 0>
 ColumnPredicate* _create_in_list_predicate(uint32_t column_id, const 
ConditionType& conditions,
                                            const ConvertFunc& convert, bool 
is_opposite,
-                                           const TabletColumn* col, 
vectorized::Arena& arena) {
+                                           const vectorized::DataTypePtr& 
data_type,
+                                           vectorized::Arena& arena) {
     using T = typename PrimitiveTypeTraits<Type>::CppType;
     if constexpr (N >= 1 && N <= FIXED_CONTAINER_MAX_SIZE) {
         using Set = std::conditional_t<
@@ -556,49 +594,50 @@ ColumnPredicate* _create_in_list_predicate(uint32_t 
column_id, const ConditionTy
                 HybridSet<Type, FixedContainer<T, N>,
                           
vectorized::PredicateColumnType<PredicateEvaluateType<Type>>>>;
         return new InListPredicateBase<Type, PT, Set>(column_id, conditions, 
convert, is_opposite,
-                                                      col, arena);
+                                                      data_type, arena);
     } else {
         using Set = std::conditional_t<
                 std::is_same_v<T, StringRef>, 
StringSet<DynamicContainer<std::string>>,
                 HybridSet<Type, DynamicContainer<T>,
                           
vectorized::PredicateColumnType<PredicateEvaluateType<Type>>>>;
         return new InListPredicateBase<Type, PT, Set>(column_id, conditions, 
convert, is_opposite,
-                                                      col, arena);
+                                                      data_type, arena);
     }
 }
 
 template <PrimitiveType Type, PredicateType PT, typename ConditionType, 
typename ConvertFunc>
 ColumnPredicate* create_in_list_predicate(uint32_t column_id, const 
ConditionType& conditions,
                                           const ConvertFunc& convert, bool 
is_opposite,
-                                          const TabletColumn* col, 
vectorized::Arena& arena) {
+                                          const vectorized::DataTypePtr& 
data_type,
+                                          vectorized::Arena& arena) {
     if (conditions.size() == 1) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
1>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 2) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
2>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 3) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
3>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 4) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
4>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 5) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
5>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 6) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
6>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == 7) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc, 
7>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     } else if (conditions.size() == FIXED_CONTAINER_MAX_SIZE) {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc,
                                          FIXED_CONTAINER_MAX_SIZE>(column_id, 
conditions, convert,
-                                                                   
is_opposite, col, arena);
+                                                                   
is_opposite, data_type, arena);
     } else {
         return _create_in_list_predicate<Type, PT, ConditionType, ConvertFunc>(
-                column_id, conditions, convert, is_opposite, col, arena);
+                column_id, conditions, convert, is_opposite, data_type, arena);
     }
 }
 
diff --git a/be/src/olap/null_predicate.h b/be/src/olap/null_predicate.h
index cf3a1bd0dbd..5e51e25e8c5 100644
--- a/be/src/olap/null_predicate.h
+++ b/be/src/olap/null_predicate.h
@@ -66,6 +66,17 @@ public:
         }
     }
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        if (_is_null) {
+            return true;
+        } else {
+            return !statistic->is_all_null;
+        }
+    }
+
     bool evaluate_del(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override {
         // evaluate_del only use for delete condition to filter page, need use 
delete condition origin value,
         // when opposite==true, origin value 'is null'->'is not null' and 'is 
not null'->'is null',
diff --git a/be/src/olap/predicate_creator.h b/be/src/olap/predicate_creator.h
index 89b8cb70a03..7bf5b65181a 100644
--- a/be/src/olap/predicate_creator.h
+++ b/be/src/olap/predicate_creator.h
@@ -40,6 +40,7 @@
 #include "util/date_func.h"
 #include "util/string_util.h"
 #include "vec/common/string_ref.h"
+#include "vec/data_types/data_type.h"
 #include "vec/functions/cast/cast_parameters.h"
 #include "vec/functions/cast/cast_to_basic_number_common.h"
 
@@ -48,7 +49,7 @@ namespace doris {
 template <typename ConditionType>
 class PredicateCreator {
 public:
-    virtual ColumnPredicate* create(const TabletColumn& column, int index,
+    virtual ColumnPredicate* create(const vectorized::DataTypePtr& data_type, 
int index,
                                     const ConditionType& conditions, bool 
opposite,
                                     vectorized::Arena& arena) = 0;
     virtual ~PredicateCreator() = default;
@@ -58,11 +59,12 @@ template <PrimitiveType Type, PredicateType PT, typename 
ConditionType>
 class IntegerPredicateCreator : public PredicateCreator<ConditionType> {
 public:
     using CppType = typename PrimitiveTypeTraits<Type>::CppType;
-    ColumnPredicate* create(const TabletColumn& column, int index, const 
ConditionType& conditions,
-                            bool opposite, vectorized::Arena& arena) override {
+    ColumnPredicate* create(const vectorized::DataTypePtr& data_type, int 
index,
+                            const ConditionType& conditions, bool opposite,
+                            vectorized::Arena& arena) override {
         if constexpr (PredicateTypeTraits::is_list(PT)) {
             return create_in_list_predicate<Type, PT, ConditionType, 
decltype(convert)>(
-                    index, conditions, convert, opposite, &column, arena);
+                    index, conditions, convert, opposite, data_type, arena);
         } else {
             static_assert(PredicateTypeTraits::is_comparison(PT));
             return new ComparisonPredicateBase<Type, PT>(index, 
convert(conditions), opposite);
@@ -102,49 +104,55 @@ template <PrimitiveType Type, PredicateType PT, typename 
ConditionType>
 class DecimalPredicateCreator : public PredicateCreator<ConditionType> {
 public:
     using CppType = typename PrimitiveTypeTraits<Type>::CppType;
-    ColumnPredicate* create(const TabletColumn& column, int index, const 
ConditionType& conditions,
-                            bool opposite, vectorized::Arena& arena) override {
+    ColumnPredicate* create(const vectorized::DataTypePtr& data_type, int 
index,
+                            const ConditionType& conditions, bool opposite,
+                            vectorized::Arena& arena) override {
         if constexpr (PredicateTypeTraits::is_list(PT)) {
             return create_in_list_predicate<Type, PT, ConditionType, 
decltype(convert)>(
-                    index, conditions, convert, opposite, &column, arena);
+                    index, conditions, convert, opposite, data_type, arena);
         } else {
             static_assert(PredicateTypeTraits::is_comparison(PT));
-            return new ComparisonPredicateBase<Type, PT>(index, 
convert(column, conditions),
+            return new ComparisonPredicateBase<Type, PT>(index, 
convert(data_type, conditions),
                                                          opposite);
         }
     }
 
 private:
-    static CppType convert(const TabletColumn& column, const std::string& 
condition) {
+    static CppType convert(const vectorized::DataTypePtr& data_type, const 
std::string& condition) {
         StringParser::ParseResult result = 
StringParser::ParseResult::PARSE_SUCCESS;
         // return CppType value cast from int128_t
-        return CppType(
-                StringParser::string_to_decimal<Type>(condition.data(), 
(int)condition.size(),
-                                                      column.precision(), 
column.frac(), &result));
+        return CppType(StringParser::string_to_decimal<Type>(
+                condition.data(), (int)condition.size(), 
data_type->get_precision(),
+                data_type->get_scale(), &result));
     }
 };
 
 template <PrimitiveType Type, PredicateType PT, typename ConditionType>
 class StringPredicateCreator : public PredicateCreator<ConditionType> {
 public:
-    ColumnPredicate* create(const TabletColumn& column, int index, const 
ConditionType& conditions,
-                            bool opposite, vectorized::Arena& arena) override {
+    ColumnPredicate* create(const vectorized::DataTypePtr& data_type, int 
index,
+                            const ConditionType& conditions, bool opposite,
+                            vectorized::Arena& arena) override {
         if constexpr (PredicateTypeTraits::is_list(PT)) {
             return create_in_list_predicate<Type, PT, ConditionType, 
decltype(convert)>(
-                    index, conditions, convert, opposite, &column, arena);
+                    index, conditions, convert, opposite, data_type, arena);
         } else {
             static_assert(PredicateTypeTraits::is_comparison(PT));
-            return new ComparisonPredicateBase<Type, PT>(index, 
convert(column, conditions, arena),
-                                                         opposite);
+            return new ComparisonPredicateBase<Type, PT>(
+                    index, convert(data_type, conditions, arena), opposite);
         }
     }
 
 private:
-    static StringRef convert(const TabletColumn& column, const std::string& 
condition,
+    static StringRef convert(const vectorized::DataTypePtr& data_type, const 
std::string& condition,
                              vectorized::Arena& arena) {
         size_t length = condition.length();
         if constexpr (Type == TYPE_CHAR) {
-            length = std::max(static_cast<size_t>(column.length()), length);
+            length = std::max(
+                    static_cast<size_t>(assert_cast<const 
vectorized::DataTypeString*>(
+                                                
vectorized::remove_nullable(data_type).get())
+                                                ->len()),
+                    length);
         }
 
         char* buffer = arena.alloc(length);
@@ -162,11 +170,12 @@ public:
     CustomPredicateCreator(const std::function<CppType(const std::string& 
condition)>& convert)
             : _convert(convert) {}
 
-    ColumnPredicate* create(const TabletColumn& column, int index, const 
ConditionType& conditions,
-                            bool opposite, vectorized::Arena& arena) override {
+    ColumnPredicate* create(const vectorized::DataTypePtr& data_type, int 
index,
+                            const ConditionType& conditions, bool opposite,
+                            vectorized::Arena& arena) override {
         if constexpr (PredicateTypeTraits::is_list(PT)) {
             return create_in_list_predicate<Type, PT, ConditionType, 
decltype(_convert)>(
-                    index, conditions, _convert, opposite, &column, arena);
+                    index, conditions, _convert, opposite, data_type, arena);
         } else {
             static_assert(PredicateTypeTraits::is_comparison(PT));
             return new ComparisonPredicateBase<Type, PT>(index, 
_convert(conditions), opposite);
@@ -178,30 +187,31 @@ private:
 };
 
 template <PredicateType PT, typename ConditionType>
-std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& 
type) {
-    switch (type) {
-    case FieldType::OLAP_FIELD_TYPE_TINYINT: {
+std::unique_ptr<PredicateCreator<ConditionType>> get_creator(
+        const vectorized::DataTypePtr& data_type) {
+    switch (data_type->get_primitive_type()) {
+    case TYPE_TINYINT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_TINYINT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
+    case TYPE_SMALLINT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_SMALLINT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_INT: {
+    case TYPE_INT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_INT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_BIGINT: {
+    case TYPE_BIGINT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_BIGINT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
+    case TYPE_LARGEINT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_LARGEINT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_FLOAT: {
+    case TYPE_FLOAT: {
         return std::make_unique<IntegerPredicateCreator<TYPE_FLOAT, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
+    case TYPE_DOUBLE: {
         return std::make_unique<IntegerPredicateCreator<TYPE_DOUBLE, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
+    case TYPE_DECIMALV2: {
         return std::make_unique<CustomPredicateCreator<TYPE_DECIMALV2, PT, 
ConditionType>>(
                 [](const std::string& condition) {
                     decimal12_t value = {0, 0};
@@ -211,46 +221,46 @@ std::unique_ptr<PredicateCreator<ConditionType>> 
get_creator(const FieldType& ty
                     return DecimalV2Value(value.integer, value.fraction);
                 });
     }
-    case FieldType::OLAP_FIELD_TYPE_DECIMAL32: {
+    case TYPE_DECIMAL32: {
         return std::make_unique<DecimalPredicateCreator<TYPE_DECIMAL32, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DECIMAL64: {
+    case TYPE_DECIMAL64: {
         return std::make_unique<DecimalPredicateCreator<TYPE_DECIMAL64, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DECIMAL128I: {
+    case TYPE_DECIMAL128I: {
         return std::make_unique<DecimalPredicateCreator<TYPE_DECIMAL128I, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DECIMAL256: {
+    case TYPE_DECIMAL256: {
         return std::make_unique<DecimalPredicateCreator<TYPE_DECIMAL256, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_CHAR: {
+    case TYPE_CHAR: {
         return std::make_unique<StringPredicateCreator<TYPE_CHAR, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_VARCHAR:
-    case FieldType::OLAP_FIELD_TYPE_STRING: {
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
         return std::make_unique<StringPredicateCreator<TYPE_STRING, PT, 
ConditionType>>();
     }
-    case FieldType::OLAP_FIELD_TYPE_DATE: {
+    case TYPE_DATE: {
         return std::make_unique<CustomPredicateCreator<TYPE_DATE, PT, 
ConditionType>>(
                 timestamp_from_date);
     }
-    case FieldType::OLAP_FIELD_TYPE_DATEV2: {
+    case TYPE_DATEV2: {
         return std::make_unique<CustomPredicateCreator<TYPE_DATEV2, PT, 
ConditionType>>(
                 timestamp_from_date_v2);
     }
-    case FieldType::OLAP_FIELD_TYPE_DATETIME: {
+    case TYPE_DATETIME: {
         return std::make_unique<CustomPredicateCreator<TYPE_DATETIME, PT, 
ConditionType>>(
                 timestamp_from_datetime);
     }
-    case FieldType::OLAP_FIELD_TYPE_DATETIMEV2: {
+    case TYPE_DATETIMEV2: {
         return std::make_unique<CustomPredicateCreator<TYPE_DATETIMEV2, PT, 
ConditionType>>(
                 timestamp_from_datetime_v2);
     }
-    case FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ: {
+    case TYPE_TIMESTAMPTZ: {
         return std::make_unique<CustomPredicateCreator<TYPE_TIMESTAMPTZ, PT, 
ConditionType>>(
                 timestamptz_from_string);
     }
-    case FieldType::OLAP_FIELD_TYPE_BOOL: {
+    case TYPE_BOOLEAN: {
         return std::make_unique<CustomPredicateCreator<TYPE_BOOLEAN, PT, 
ConditionType>>(
                 [](const std::string& condition) {
                     int32_t ivalue = 0;
@@ -266,7 +276,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> 
get_creator(const FieldType& ty
                     return value;
                 });
     }
-    case FieldType::OLAP_FIELD_TYPE_IPV4: {
+    case TYPE_IPV4: {
         return std::make_unique<CustomPredicateCreator<TYPE_IPV4, PT, 
ConditionType>>(
                 [](const std::string& condition) {
                     IPv4 value;
@@ -275,7 +285,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> 
get_creator(const FieldType& ty
                     return value;
                 });
     }
-    case FieldType::OLAP_FIELD_TYPE_IPV6: {
+    case TYPE_IPV6: {
         return std::make_unique<CustomPredicateCreator<TYPE_IPV6, PT, 
ConditionType>>(
                 [](const std::string& condition) {
                     IPv6 value;
@@ -290,33 +300,33 @@ std::unique_ptr<PredicateCreator<ConditionType>> 
get_creator(const FieldType& ty
 }
 
 template <PredicateType PT, typename ConditionType>
-ColumnPredicate* create_predicate(const TabletColumn& column, int index,
+ColumnPredicate* create_predicate(const vectorized::DataTypePtr& data_type, 
int index,
                                   const ConditionType& conditions, bool 
opposite,
                                   vectorized::Arena& arena) {
-    return get_creator<PT, ConditionType>(column.type())
-            ->create(column, index, conditions, opposite, arena);
+    return get_creator<PT, ConditionType>(data_type)->create(data_type, index, 
conditions, opposite,
+                                                             arena);
 }
 
 template <PredicateType PT>
-ColumnPredicate* create_comparison_predicate(const TabletColumn& column, int 
index,
+ColumnPredicate* create_comparison_predicate(const vectorized::DataTypePtr& 
data_type, int index,
                                              const std::string& condition, 
bool opposite,
                                              vectorized::Arena& arena) {
     static_assert(PredicateTypeTraits::is_comparison(PT));
-    return create_predicate<PT, std::string>(column, index, condition, 
opposite, arena);
+    return create_predicate<PT, std::string>(data_type, index, condition, 
opposite, arena);
 }
 
 template <PredicateType PT>
-ColumnPredicate* create_list_predicate(const TabletColumn& column, int index,
+ColumnPredicate* create_list_predicate(const vectorized::DataTypePtr& 
data_type, int index,
                                        const std::vector<std::string>& 
conditions, bool opposite,
                                        vectorized::Arena& arena) {
     static_assert(PredicateTypeTraits::is_list(PT));
-    return create_predicate<PT, std::vector<std::string>>(column, index, 
conditions, opposite,
+    return create_predicate<PT, std::vector<std::string>>(data_type, index, 
conditions, opposite,
                                                           arena);
 }
 
 // This method is called in reader and in deletehandler.
 // The "column" parameter might represent a column resulting from the 
decomposition of a variant column.
-inline ColumnPredicate* parse_to_predicate(const TabletColumn& column, 
uint32_t index,
+inline ColumnPredicate* parse_to_predicate(const vectorized::DataTypePtr& 
data_type, uint32_t index,
                                            const TCondition& condition, 
vectorized::Arena& arena,
                                            bool opposite = false) {
     if (to_lower(condition.condition_op) == "is") {
@@ -333,7 +343,7 @@ inline ColumnPredicate* parse_to_predicate(const 
TabletColumn& column, uint32_t
         } else {
             create = create_list_predicate<PredicateType::NOT_IN_LIST>;
         }
-        return create(column, index, condition.condition_values, opposite, 
arena);
+        return create(data_type, index, condition.condition_values, opposite, 
arena);
     }
 
     decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* create = 
nullptr;
@@ -350,7 +360,7 @@ inline ColumnPredicate* parse_to_predicate(const 
TabletColumn& column, uint32_t
     } else if (condition.condition_op == ">=") {
         create = create_comparison_predicate<PredicateType::GE>;
     }
-    return create(column, index, condition.condition_values[0], opposite, 
arena);
+    return create(data_type, index, condition.condition_values[0], opposite, 
arena);
 }
 #include "common/compile_check_end.h"
 } //namespace doris
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index fb5c5121da7..800485f6bb0 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -540,7 +540,8 @@ Status TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
         const auto& column = 
*DORIS_TRY(_tablet_schema->column(tmp_cond.column_name));
         const auto& mcolumn = materialize_column(column);
         uint32_t index = _tablet_schema->field_index(tmp_cond.column_name);
-        ColumnPredicate* predicate = parse_to_predicate(mcolumn, index, 
tmp_cond, _predicate_arena);
+        ColumnPredicate* predicate =
+                parse_to_predicate(mcolumn.get_vec_type(), index, tmp_cond, 
_predicate_arena);
         // record condition value into predicate_params in order to pushdown 
segment_iterator,
         // _gen_predicate_result_sign will build predicate result unique sign 
with condition value
         predicate->attach_profile_counter(param.runtime_filter_id, 
param.filtered_rows_counter,
diff --git a/be/src/runtime/runtime_predicate.cpp 
b/be/src/runtime/runtime_predicate.cpp
index 1269d6b5666..2c763dfcb98 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -240,9 +240,9 @@ Status RuntimePredicate::update(const Field& value) {
             continue;
         }
         const auto& column = 
*DORIS_TRY(ctx.tablet_schema->column(ctx.col_name));
-        std::unique_ptr<ColumnPredicate> pred {_pred_constructor(column, 
ctx.predicate->column_id(),
-                                                                 
_get_value_fn(_orderby_extrem),
-                                                                 false, 
_predicate_arena)};
+        std::unique_ptr<ColumnPredicate> pred {
+                _pred_constructor(column.get_vec_type(), 
ctx.predicate->column_id(),
+                                  _get_value_fn(_orderby_extrem), false, 
_predicate_arena)};
 
         // For NULLS FIRST, wrap a AcceptNullPredicate to return true for NULL
         // since ORDER BY ASC/DESC should get NULL first but pred returns NULL
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index e85cc890ca5..51c79e1b426 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -154,7 +154,7 @@ private:
     Field _orderby_extrem {PrimitiveType::TYPE_NULL};
     Arena _predicate_arena;
     std::function<std::string(const Field&)> _get_value_fn;
-    std::function<ColumnPredicate*(const TabletColumn&, int, const 
std::string&, bool,
+    std::function<ColumnPredicate*(const DataTypePtr&, int, const 
std::string&, bool,
                                    vectorized::Arena&)>
             _pred_constructor;
     bool _detected_source = false;
diff --git a/be/src/vec/core/field.cpp b/be/src/vec/core/field.cpp
index c0b03df7c3c..b378885ae65 100644
--- a/be/src/vec/core/field.cpp
+++ b/be/src/vec/core/field.cpp
@@ -791,6 +791,76 @@ std::string_view Field::as_string_view() const {
 
 #undef MATCH_PRIMITIVE_TYPE
 
+#define MATCH_PRIMITIVE_TYPE(primite_type)                                     
              \
+    if (type == primite_type) {                                                
              \
+        const auto& v = get<typename 
PrimitiveTypeTraits<primite_type>::NearestFieldType>(); \
+        return std::to_string(v);                                              
              \
+    }
+
+std::string Field::to_string() const {
+    if (type == PrimitiveType::TYPE_STRING || type == 
PrimitiveType::TYPE_VARCHAR ||
+        type == PrimitiveType::TYPE_CHAR) {
+        const auto& s = get<String>();
+        return {s.data(), s.size()};
+    }
+    if (type == TYPE_DECIMAL32) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL32>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL64) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL64>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMALV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMALV2>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL128I) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL128I>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL256) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL256>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_LARGEINT) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_LARGEINT>::NearestFieldType>();
+        return int128_to_string(v);
+    }
+    if (type == TYPE_DATE || type == TYPE_DATETIME) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATE>::NearestFieldType>();
+        std::string buf(40, 0);
+        auto* to = binary_cast<int64_t, 
doris::VecDateTimeValue>(v).to_string(buf.data());
+        buf.resize(to - buf.data() - 1);
+        return buf;
+    }
+    if (type == TYPE_DATEV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATEV2>::NearestFieldType>();
+        return binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>((uint32_t)v).to_string();
+    }
+    if (type == TYPE_DATETIMEV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATETIMEV2>::NearestFieldType>();
+        return binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(v).to_string();
+    }
+    MATCH_PRIMITIVE_TYPE(TYPE_BOOLEAN);
+    MATCH_PRIMITIVE_TYPE(TYPE_TINYINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_SMALLINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_INT);
+    MATCH_PRIMITIVE_TYPE(TYPE_BIGINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_FLOAT);
+    MATCH_PRIMITIVE_TYPE(TYPE_DOUBLE);
+    MATCH_PRIMITIVE_TYPE(TYPE_TIME);
+    MATCH_PRIMITIVE_TYPE(TYPE_TIMEV2);
+    //    MATCH_PRIMITIVE_TYPE(TYPE_IPV4);
+    //    MATCH_PRIMITIVE_TYPE(TYPE_IPV6);
+    MATCH_PRIMITIVE_TYPE(TYPE_UINT32);
+    MATCH_PRIMITIVE_TYPE(TYPE_UINT64);
+    throw Exception(
+            Status::FatalError("type not supported for to_string, type={}", 
get_type_name()));
+}
+
+#undef MATCH_PRIMITIVE_TYPE
+
 #define DECLARE_FUNCTION(FUNC_NAME)                                            
              \
     template void Field::FUNC_NAME<TYPE_NULL>(                                 
              \
             typename PrimitiveTypeTraits<TYPE_NULL>::NearestFieldType && rhs); 
              \
diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h
index 3ef504dc0d6..d0045cd73d8 100644
--- a/be/src/vec/core/field.h
+++ b/be/src/vec/core/field.h
@@ -564,6 +564,7 @@ public:
     }
 
     std::string_view as_string_view() const;
+    std::string to_string() const;
 
 private:
     std::aligned_union_t<DBMS_MIN_FIELD_SIZE - sizeof(PrimitiveType), Null, 
UInt64, UInt128, Int64,
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
index 3c17ae79016..5849781a2e5 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -107,8 +107,8 @@ DataTypePtr DataTypeFactory::create_data_type(const 
TabletColumn& col_desc, bool
     } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
         nested = 
std::make_shared<DataTypeVariant>(col_desc.variant_max_subcolumns_count());
     } else {
-        nested =
-                _create_primitive_data_type(col_desc.type(), 
col_desc.precision(), col_desc.frac());
+        nested = _create_primitive_data_type(col_desc.type(), 
col_desc.precision(), col_desc.frac(),
+                                             col_desc.length());
     }
 
     if ((is_nullable || col_desc.is_nullable()) && nested) {
@@ -118,7 +118,7 @@ DataTypePtr DataTypeFactory::create_data_type(const 
TabletColumn& col_desc, bool
 }
 
 DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& 
type, int precision,
-                                                         int scale) const {
+                                                         int scale, int 
length) const {
     DataTypePtr result = nullptr;
     switch (type) {
     case FieldType::OLAP_FIELD_TYPE_BOOL:
@@ -167,7 +167,7 @@ DataTypePtr 
DataTypeFactory::_create_primitive_data_type(const FieldType& type,
         result = std::make_shared<vectorized::DataTypeFloat64>();
         break;
     case FieldType::OLAP_FIELD_TYPE_CHAR:
-        result = std::make_shared<vectorized::DataTypeString>(-1, TYPE_CHAR);
+        result = std::make_shared<vectorized::DataTypeString>(length, 
TYPE_CHAR);
         break;
     case FieldType::OLAP_FIELD_TYPE_VARCHAR:
         result = std::make_shared<vectorized::DataTypeString>(-1, 
TYPE_VARCHAR);
@@ -386,7 +386,7 @@ DataTypePtr DataTypeFactory::create_data_type(const 
segment_v2::ColumnMetaPB& pc
     } else {
         // TODO add precision and frac
         nested = 
_create_primitive_data_type(static_cast<FieldType>(pcolumn.type()),
-                                             pcolumn.precision(), 
pcolumn.frac());
+                                             pcolumn.precision(), 
pcolumn.frac(), -1);
     }
 
     if (pcolumn.is_nullable() && nested) {
diff --git a/be/src/vec/data_types/data_type_factory.hpp 
b/be/src/vec/data_types/data_type_factory.hpp
index a0fda8885d3..1b15fb6aff0 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -67,7 +67,7 @@ public:
     DataTypePtr create_data_type(const TTypeDesc& raw_type, bool is_nullable);
 
     DataTypePtr create_data_type(const FieldType& type, int precision, int 
scale) {
-        return _create_primitive_data_type(type, precision, scale);
+        return _create_primitive_data_type(type, precision, scale, -1);
     }
     // Create DataType by PrimitiveType (only for naive types)
     DataTypePtr create_data_type(const PrimitiveType primitive_type, bool 
is_nullable,
@@ -86,7 +86,8 @@ public:
     }
 
 private:
-    DataTypePtr _create_primitive_data_type(const FieldType& type, int 
precision, int scale) const;
+    DataTypePtr _create_primitive_data_type(const FieldType& type, int 
precision, int scale,
+                                            int length) const;
 
     std::string _empty_string;
 };
diff --git a/be/src/vec/exec/format/generic_reader.cpp 
b/be/src/vec/exec/format/generic_reader.cpp
new file mode 100644
index 00000000000..8b3339faede
--- /dev/null
+++ b/be/src/vec/exec/format/generic_reader.cpp
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/generic_reader.h"
+
+#include "olap/predicate_creator.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/exprs/vtopn_pred.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+Status ExprPushDownHelper::_extract_predicates(const VExprSPtr& expr, int& cid,
+                                               DataTypePtr& data_type, 
std::vector<Field>& values,
+                                               bool null_pred, bool& parsed) 
const {
+    parsed = false;
+    values.clear();
+    if (!expr->children()[0]->is_slot_ref()) [[unlikely]] {
+        return Status::OK();
+    }
+    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
+    cid = slot_ref->column_id();
+    values.reserve(expr->children().size() - 1);
+    data_type = remove_nullable(slot_ref->data_type());
+    if (null_pred) {
+        DCHECK_EQ(expr->children().size(), 1);
+        parsed = true;
+    }
+    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
+        auto child_expr = expr->children()[child_id];
+        if (!child_expr->is_literal()) {
+            return Status::OK();
+        }
+        const auto* literal = static_cast<const VLiteral*>(child_expr.get());
+        if (literal->get_column_ptr()->is_null_at(0)) {
+            continue;
+        }
+        values.emplace_back(literal->get_column_ptr()->operator[](0));
+        parsed = true;
+    }
+    return Status::OK();
+}
+
+Status ExprPushDownHelper::convert_predicates(
+        const VExprSPtrs& exprs, 
std::vector<std::unique_ptr<ColumnPredicate>>& predicates,
+        std::unique_ptr<MutilColumnBlockPredicate>& root, Arena& arena) {
+    if (exprs.empty()) {
+        return Status::OK();
+    }
+
+    int cid;
+    DataTypePtr data_type;
+    std::vector<Field> values;
+    bool parsed = false;
+    for (const auto& expr : exprs) {
+        cid = -1;
+        values.clear();
+        parsed = false;
+        switch (expr->node_type()) {
+        case TExprNodeType::BINARY_PRED: {
+            decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* 
create = nullptr;
+            if (expr->op() == TExprOpcode::EQ) {
+                create = create_comparison_predicate<PredicateType::EQ>;
+            } else if (expr->op() == TExprOpcode::NE) {
+                create = create_comparison_predicate<PredicateType::NE>;
+            } else if (expr->op() == TExprOpcode::LT) {
+                create = create_comparison_predicate<PredicateType::LT>;
+            } else if (expr->op() == TExprOpcode::LE) {
+                create = create_comparison_predicate<PredicateType::LE>;
+            } else if (expr->op() == TExprOpcode::GT) {
+                create = create_comparison_predicate<PredicateType::GT>;
+            } else if (expr->op() == TExprOpcode::GE) {
+                create = create_comparison_predicate<PredicateType::GE>;
+            } else {
+                break;
+            }
+            RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, values, 
false, parsed));
+            if (parsed) {
+                // TODO(gabriel): Use string view
+                predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                        create(data_type, cid, values[0].to_string(), false, 
arena)));
+                root->add_column_predicate(
+                        
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+            }
+            break;
+        }
+        case TExprNodeType::IN_PRED: {
+            switch (expr->op()) {
+            case TExprOpcode::FILTER_IN: {
+                RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, 
values, false, parsed));
+                if (parsed) {
+                    // TODO(gabriel): Use string view
+                    std::vector<std::string> conditions(values.size());
+                    for (size_t i = 0; i < conditions.size(); i++) {
+                        conditions[i] = values[i].to_string();
+                    }
+                    predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                            create_list_predicate<PredicateType::IN_LIST>(
+                                    data_type, cid, conditions, false, 
arena)));
+                    root->add_column_predicate(
+                            
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+                }
+                break;
+            }
+            default: {
+                break;
+            }
+            }
+            break;
+        }
+        case TExprNodeType::COMPOUND_PRED: {
+            switch (expr->op()) {
+            case TExprOpcode::COMPOUND_AND: {
+                for (const auto& child : expr->children()) {
+                    RETURN_IF_ERROR(convert_predicates({child}, predicates, 
root, arena));
+                }
+                break;
+            }
+            case TExprOpcode::COMPOUND_OR: {
+                std::unique_ptr<MutilColumnBlockPredicate> new_root =
+                        OrBlockColumnPredicate::create_unique();
+                for (const auto& child : expr->children()) {
+                    RETURN_IF_ERROR(convert_predicates({child}, predicates, 
new_root, arena));
+                }
+                root->add_column_predicate(std::move(new_root));
+                break;
+            }
+            default: {
+                break;
+            }
+            }
+            break;
+        }
+        case TExprNodeType::FUNCTION_CALL: {
+            auto fn_name = expr->fn().name.function_name;
+            // only support `is null` and `is not null`
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, 
values, true, parsed));
+                if (parsed) {
+                    predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                            new NullPredicate(cid, true, fn_name == 
"is_not_null_pred")));
+                    root->add_column_predicate(
+                            
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+                }
+            }
+            break;
+        }
+        default:
+            break;
+        }
+    }
+
+    return Status::OK();
+}
+
+bool ExprPushDownHelper::check_expr_can_push_down(const VExprSPtr& expr) const 
{
+    if (expr == nullptr) {
+        return false;
+    }
+
+    switch (expr->node_type()) {
+    case TExprNodeType::BINARY_PRED:
+    case TExprNodeType::IN_PRED: {
+        switch (expr->op()) {
+        case TExprOpcode::GE:
+        case TExprOpcode::GT:
+        case TExprOpcode::LE:
+        case TExprOpcode::LT:
+        case TExprOpcode::EQ:
+        case TExprOpcode::FILTER_IN:
+            return _check_slot_can_push_down(expr) && 
_check_other_children_is_literal(expr);
+        default: {
+            return false;
+        }
+        }
+    }
+    case TExprNodeType::COMPOUND_PRED: {
+        switch (expr->op()) {
+        case TExprOpcode::COMPOUND_AND: {
+            // at least one child can be pushed down
+            return std::ranges::any_of(expr->children(), [this](const auto& 
child) {
+                return check_expr_can_push_down(child);
+            });
+        }
+        case TExprOpcode::COMPOUND_OR: {
+            // all children must be pushed down
+            return std::ranges::all_of(expr->children(), [this](const auto& 
child) {
+                return check_expr_can_push_down(child);
+            });
+        }
+        default: {
+            return false;
+        }
+        }
+    }
+    case TExprNodeType::FUNCTION_CALL: {
+        auto fn_name = expr->fn().name.function_name;
+        // only support `is null` and `is not null`
+        if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+            return _check_slot_can_push_down(expr);
+        }
+        return false;
+    }
+    default: {
+        return false;
+    }
+    }
+}
+
+bool ExprPushDownHelper::_check_slot_can_push_down(const VExprSPtr& expr) 
const {
+    if (!expr->children()[0]->is_slot_ref()) {
+        return false;
+    }
+
+    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
+    // check if the slot exists in parquet file.
+    if (!_exists_in_file(slot_ref)) {
+        return false;
+    }
+    return _type_matches(slot_ref);
+}
+
+bool ExprPushDownHelper::_check_other_children_is_literal(const VExprSPtr& 
expr) const {
+    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
+        auto child_expr = expr->children()[child_id];
+        if (!child_expr->is_literal()) {
+            return false;
+        }
+    }
+    return true;
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 6c1c12781d2..b21971b7a3f 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -20,15 +20,21 @@
 #include <gen_cpp/PlanNodes_types.h>
 
 #include "common/status.h"
+#include "olap/block_column_predicate.h"
 #include "runtime/descriptors.h"
 #include "runtime/types.h"
 #include "util/profile_collector.h"
 #include "vec/exprs/vexpr_fwd.h"
 
+namespace doris {
+class ColumnPredicate;
+} // namespace doris
+
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
 class Block;
+class VSlotRef;
 // This a reader interface for all file readers.
 // A GenericReader is responsible for reading a file and return
 // a set of blocks with specified schema,
@@ -106,5 +112,25 @@ protected:
     FileMetaCache* _meta_cache = nullptr;
 };
 
+class ExprPushDownHelper {
+public:
+    ExprPushDownHelper() = default;
+    virtual ~ExprPushDownHelper() = default;
+    bool check_expr_can_push_down(const VExprSPtr& expr) const;
+    Status convert_predicates(const VExprSPtrs& exprs,
+                              std::vector<std::unique_ptr<ColumnPredicate>>& 
predicates,
+                              std::unique_ptr<MutilColumnBlockPredicate>& 
root, Arena& arena);
+
+protected:
+    virtual bool _exists_in_file(const VSlotRef*) const = 0;
+    virtual bool _type_matches(const VSlotRef*) const = 0;
+
+private:
+    bool _check_slot_can_push_down(const VExprSPtr& expr) const;
+    bool _check_other_children_is_literal(const VExprSPtr& expr) const;
+    Status _extract_predicates(const VExprSPtr& expr, int& cid, DataTypePtr& 
data_type,
+                               std::vector<Field>& values, bool null_pred, 
bool& parsed) const;
+};
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h 
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index 64dd7b7ce27..8f27b675207 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -156,17 +156,9 @@ public:
         std::string encoded_max_value;
         bool has_null;
         bool is_all_null;
-    };
-
-    enum OP {
-        EQ,
-        LT,
-        LE,
-        GT,
-        GE,
-        IS_NULL,
-        IS_NOT_NULL,
-        IN,
+        const FieldSchema* col_schema;
+        const cctz::time_zone* ctz;
+        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* 
get_stat_func;
     };
 
     static Status get_min_max_value(const FieldSchema* col_schema, const 
std::string& encoded_min,
@@ -395,56 +387,6 @@ public:
 
         return Status::OK();
     }
-
-    static bool check_can_filter(OP op, const std::vector<Field>& 
literal_values,
-                                 const ColumnStat& column_stat, const 
FieldSchema* col_schema,
-                                 cctz::time_zone* ctz) {
-        Field min_field;
-        Field max_field;
-        if (!ParquetPredicate::get_min_max_value(col_schema, 
column_stat.encoded_min_value,
-                                                 
column_stat.encoded_max_value, *ctz, &min_field,
-                                                 &max_field)) {
-            return false;
-        };
-
-        switch (op) {
-        case ParquetPredicate::OP::EQ:
-        case ParquetPredicate::OP::IN: {
-            for (const auto& in_value : literal_values) {
-                if (in_value.is_null() && column_stat.has_null) {
-                    return false;
-                }
-                if (min_field <= in_value && in_value <= max_field) {
-                    return false;
-                }
-            }
-            return true;
-        }
-        case ParquetPredicate::OP::LT: {
-            DCHECK(!literal_values[0].is_null());
-            return min_field >= literal_values[0];
-        }
-        case ParquetPredicate::OP::LE: {
-            DCHECK(!literal_values[0].is_null());
-            return min_field > literal_values[0];
-        }
-        case ParquetPredicate::OP::GT: {
-            DCHECK(!literal_values[0].is_null());
-            return max_field <= literal_values[0];
-        }
-        case ParquetPredicate::OP::GE: {
-            DCHECK(!literal_values[0].is_null());
-            return max_field < literal_values[0];
-        }
-        case ParquetPredicate::OP::IS_NULL: {
-            return !column_stat.has_null;
-        }
-        case ParquetPredicate::OP::IS_NOT_NULL: {
-            return column_stat.is_all_null;
-        }
-        }
-        return false;
-    }
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6d871759b67..f0f16bf6f72 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -372,71 +372,11 @@ Status ParquetReader::init_reader(
     return Status::OK();
 }
 
-bool ParquetReader::_check_expr_can_push_down(const VExprSPtr& expr) {
-    if (expr == nullptr) {
-        return false;
-    }
-
-    switch (expr->node_type()) {
-    case TExprNodeType::BINARY_PRED:
-    case TExprNodeType::IN_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::GE:
-        case TExprOpcode::GT:
-        case TExprOpcode::LE:
-        case TExprOpcode::LT:
-        case TExprOpcode::EQ:
-        case TExprOpcode::FILTER_IN:
-            return _check_slot_can_push_down(expr) && 
_check_other_children_is_literal(expr);
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::COMPOUND_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::COMPOUND_AND: {
-            // at least one child can be pushed down
-            return std::ranges::any_of(expr->children(), [this](const auto& 
child) {
-                return _check_expr_can_push_down(child);
-            });
-        }
-        case TExprOpcode::COMPOUND_OR: {
-            // all children must be pushed down
-            return std::ranges::all_of(expr->children(), [this](const auto& 
child) {
-                return _check_expr_can_push_down(child);
-            });
-        }
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::FUNCTION_CALL: {
-        auto fn_name = expr->fn().name.function_name;
-        // only support `is null` and `is not null`
-        if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
-            return _check_slot_can_push_down(expr);
-        }
-        return false;
-    }
-    default: {
-        return false;
-    }
-    }
+bool ParquetReader::_exists_in_file(const VSlotRef* slot_ref) const {
+    return _table_info_node_ptr->children_column_exists(slot_ref->expr_name());
 }
 
-bool ParquetReader::_check_slot_can_push_down(const VExprSPtr& expr) {
-    if (!expr->children()[0]->is_slot_ref()) {
-        return false;
-    }
-
-    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
-    // check if the slot exists in parquet file.
-    if (!_table_info_node_ptr->children_column_exists(slot_ref->expr_name())) {
-        return false;
-    }
-
+bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
     auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
     auto table_col_type = remove_nullable(slot->type());
 
@@ -444,126 +384,8 @@ bool ParquetReader::_check_slot_can_push_down(const 
VExprSPtr& expr) {
     const auto& file_col_type =
             
remove_nullable(_file_metadata->schema().get_column(file_col_name)->data_type);
 
-    // If a schema change occurs, the min and max values of the parquet file 
cannot be guaranteed to be valid for the current table.
-    if (table_col_type->get_primitive_type() != 
file_col_type->get_primitive_type() ||
-        is_complex_type(table_col_type->get_primitive_type())) {
-        return false;
-    }
-
-    return true;
-}
-
-bool ParquetReader::_check_other_children_is_literal(const VExprSPtr& expr) {
-    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
-        auto child_expr = expr->children()[child_id];
-        if (!child_expr->is_literal()) {
-            return false;
-        }
-    }
-    return true;
-}
-
-// Although we have already checked whether the children of expr are slots and 
literals in `_check_expr_can_push_down`, we still need to check again here
-// because the existence of the AND predicate will cause only some children to 
be checked.
-bool ParquetReader::_simple_expr_push_down(
-        const VExprSPtr& expr, ParquetPredicate::OP op,
-        const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                get_stat_func) {
-    if (!expr->children()[0]->is_slot_ref()) [[unlikely]] {
-        return false;
-    }
-    const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
-
-    std::vector<Field> literal_values(expr->children().size() - 1);
-    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
-        auto child_expr = expr->children()[child_id];
-        if (!child_expr->is_literal()) {
-            return false;
-        }
-        const auto* literal = static_cast<const VLiteral*>(child_expr.get());
-        if (literal->get_column_ptr()->is_null_at(0)) {
-            continue;
-        }
-        literal_values[child_id - 1] = 
literal->get_column_ptr()->operator[](0);
-    }
-
-    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
-    if (!_table_info_node_ptr->children_column_exists(slot->col_name())) {
-        return false;
-    }
-
-    const auto& file_col_name = 
_table_info_node_ptr->children_file_column_name(slot->col_name());
-    const FieldSchema* col_schema = 
_file_metadata->schema().get_column(file_col_name);
-
-    ParquetPredicate::ColumnStat column_stat;
-    if (!get_stat_func(col_schema, &column_stat)) {
-        return false;
-    }
-
-    return ParquetPredicate::check_can_filter(op, literal_values, column_stat, 
col_schema, _ctz);
-}
-
-bool ParquetReader::_expr_push_down(
-        const VExprSPtr& expr,
-        const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                get_stat_func) {
-    if (expr == nullptr) {
-        return false;
-    }
-
-    switch (expr->node_type()) {
-    case TExprNodeType::BINARY_PRED:
-    case TExprNodeType::IN_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::GE:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::GE, 
get_stat_func);
-        case TExprOpcode::GT:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::GT, 
get_stat_func);
-        case TExprOpcode::LE:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::LE, 
get_stat_func);
-        case TExprOpcode::LT:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::LT, 
get_stat_func);
-        case TExprOpcode::EQ:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::EQ, 
get_stat_func);
-        case TExprOpcode::FILTER_IN:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::IN, 
get_stat_func);
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::COMPOUND_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::COMPOUND_AND: {
-            return std::ranges::any_of(expr->children(), [&](const auto& 
child) {
-                return _expr_push_down(child, get_stat_func);
-            });
-        }
-        case TExprOpcode::COMPOUND_OR: {
-            return std::ranges::all_of(expr->children(), [&](const auto& 
child) {
-                return _expr_push_down(child, get_stat_func);
-            });
-        }
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::FUNCTION_CALL: {
-        auto fn_name = expr->fn().name.function_name;
-        // only support `is null` and `is not null`
-        if (fn_name == "is_null_pred") {
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::IS_NULL, 
get_stat_func);
-        } else if (fn_name == "is_not_null_pred") {
-            return _simple_expr_push_down(expr, 
ParquetPredicate::OP::IS_NOT_NULL, get_stat_func);
-        }
-        return false;
-    }
-    default: {
-        return false;
-    }
-    }
-    return false;
+    return (table_col_type->get_primitive_type() == 
file_col_type->get_primitive_type()) &&
+           !is_complex_type(table_col_type->get_primitive_type());
 }
 
 Status ParquetReader::set_fill_columns(
@@ -644,15 +466,21 @@ Status ParquetReader::set_fill_columns(
             visit_slot(expr.get());
         }
 
-        if (_check_expr_can_push_down(expr)) {
-            _push_down_exprs.emplace_back(expr);
+        if (check_expr_can_push_down(expr)) {
+            
_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
             if (expr->node_type() != TExprNodeType::COMPOUND_PRED) {
                 // for page index filter.
                 VSlotRef* slot_ref = 
static_cast<VSlotRef*>(expr->children()[0].get());
-                if (!_push_down_simple_expr.contains(slot_ref->slot_id())) {
-                    _push_down_simple_expr.emplace(slot_ref->slot_id(), 
VExprSPtrs {});
+                if 
(!_push_down_simple_predicates.contains(slot_ref->slot_id())) {
+                    _push_down_simple_predicates.emplace(
+                            slot_ref->slot_id(), 
std::vector<std::unique_ptr<ColumnPredicate>> {});
                 }
-                _push_down_simple_expr[slot_ref->slot_id()].emplace_back(expr);
+                RETURN_IF_ERROR(convert_predicates(
+                        {expr}, 
_push_down_simple_predicates[slot_ref->slot_id()],
+                        _push_down_predicates.back(), _arena));
+            } else {
+                RETURN_IF_ERROR(convert_predicates({expr}, _useless_predicates,
+                                                   
_push_down_predicates.back(), _arena));
             }
         }
     }
@@ -1119,7 +947,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     RETURN_IF_ERROR(parse_offset_index());
 
     // Check if page index is needed for min-max filter.
-    if (!_enable_filter_by_min_max || _push_down_simple_expr.empty()) {
+    if (!_enable_filter_by_min_max) {
         read_whole_row_group();
         return Status::OK();
     }
@@ -1154,14 +982,15 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
             continue;
         }
         auto& chunk = row_group.columns[parquet_col_id];
-
         if (chunk.offset_index_length == 0) {
             continue;
         }
-        if (!_push_down_simple_expr.contains(slot_id)) {
+
+        if (!_push_down_simple_predicates.contains(slot_id) ||
+            _push_down_simple_predicates[slot_id].empty()) {
             continue;
         }
-        const auto& push_down_expr = _push_down_simple_expr[slot_id];
+        const auto& predicates = _push_down_simple_predicates[slot_id];
 
         if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
             continue;
@@ -1180,12 +1009,21 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size());
 
         for (int page_id = 0; page_id < num_of_pages; page_id++) {
-            std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)> get_stat_func =
-                    [&](const FieldSchema* col_schema, 
ParquetPredicate::ColumnStat* stat) {
+            std::function<bool(ParquetPredicate::ColumnStat*, int)> 
get_stat_func =
+                    [&](ParquetPredicate::ColumnStat* stat, const int cid) {
+                        auto* slot = _tuple_descriptor->slots()[cid];
+                        if 
(!_table_info_node_ptr->children_column_exists(slot->col_name())) {
+                            return false;
+                        }
                         if (!column_index.__isset.null_counts) {
                             return false;
                         }
 
+                        const auto& file_col_name =
+                                
_table_info_node_ptr->children_file_column_name(slot->col_name());
+                        const FieldSchema* col_schema =
+                                
_file_metadata->schema().get_column(file_col_name);
+                        stat->col_schema = col_schema;
                         stat->is_all_null = column_index.null_pages[page_id];
                         stat->has_null = column_index.null_counts[page_id] > 0;
                         stat->encoded_min_value = encoded_min_vals[page_id];
@@ -1193,9 +1031,13 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
                         return true;
                     };
 
-            for (const auto& expr : push_down_expr) {
-                if (_expr_push_down(expr, get_stat_func)) {
+            ParquetPredicate::ColumnStat stat;
+            stat.ctz = _ctz;
+            stat.get_stat_func = &get_stat_func;
+            for (const auto& predicate : predicates) {
+                if (!predicate->evaluate_and(&stat)) {
                     skipped_page_range.emplace_back(page_id);
+                    break;
                 }
             }
         }
@@ -1285,20 +1127,32 @@ Status ParquetReader::_process_column_stat_filter(const 
tparquet::RowGroup& row_
         return Status::OK();
     }
 
-    std::function<bool(const FieldSchema*, ParquetPredicate::ColumnStat*)> 
get_stat_func =
-            [&](const FieldSchema* col_schema, ParquetPredicate::ColumnStat* 
stat) {
-                int parquet_col_id = col_schema->physical_column_index;
-                auto meta_data = row_group.columns[parquet_col_id].meta_data;
-
-                return ParquetPredicate::read_column_stats(col_schema, 
meta_data, &_ignored_stats,
-                                                           
_t_metadata->created_by, stat)
-                        .ok();
-            };
-
-    for (auto expr : _push_down_exprs) {
-        if (_expr_push_down(expr, get_stat_func)) {
+    for (const auto& predicate : _push_down_predicates) {
+        std::function<bool(ParquetPredicate::ColumnStat*, int)> get_stat_func =
+                [&](ParquetPredicate::ColumnStat* stat, const int cid) {
+                    auto* slot = _tuple_descriptor->slots()[cid];
+                    if 
(!_table_info_node_ptr->children_column_exists(slot->col_name())) {
+                        return false;
+                    }
+                    const auto& file_col_name =
+                            
_table_info_node_ptr->children_file_column_name(slot->col_name());
+                    const FieldSchema* col_schema =
+                            _file_metadata->schema().get_column(file_col_name);
+                    int parquet_col_id = col_schema->physical_column_index;
+                    auto meta_data = 
row_group.columns[parquet_col_id].meta_data;
+                    stat->col_schema = col_schema;
+                    return ParquetPredicate::read_column_stats(col_schema, 
meta_data,
+                                                               &_ignored_stats,
+                                                               
_t_metadata->created_by, stat)
+                            .ok();
+                };
+        ParquetPredicate::ColumnStat stat;
+        stat.ctz = _ctz;
+        stat.get_stat_func = &get_stat_func;
+
+        if (!predicate->evaluate_and(&stat)) {
             *filter_group = true;
-            break;
+            return Status::OK();
         }
     }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 3d9826c9582..a7f0ced69c7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -70,7 +70,7 @@ class VExprContext;
 
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-class ParquetReader : public GenericReader {
+class ParquetReader : public GenericReader, public ExprPushDownHelper {
     ENABLE_FACTORY_CREATOR(ParquetReader);
 
 public:
@@ -235,16 +235,8 @@ private:
 
     Status _set_read_one_line_impl() override { return Status::OK(); }
 
-    bool _expr_push_down(const VExprSPtr& expr,
-                         const std::function<bool(const FieldSchema*,
-                                                  
ParquetPredicate::ColumnStat*)>& get_stat_func);
-    bool _simple_expr_push_down(
-            const VExprSPtr& expr, ParquetPredicate::OP op,
-            const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                    get_stat_func);
-    bool _check_expr_can_push_down(const VExprSPtr& expr);
-    bool _check_slot_can_push_down(const VExprSPtr& expr);
-    bool _check_other_children_is_literal(const VExprSPtr& expr);
+    bool _exists_in_file(const VSlotRef* slot) const override;
+    bool _type_matches(const VSlotRef*) const override;
 
     RuntimeProfile* _profile = nullptr;
     const TFileScanRangeParams& _scan_params;
@@ -330,13 +322,13 @@ private:
     std::set<uint64_t> _column_ids;
     std::set<uint64_t> _filter_column_ids;
 
-    // push down =, >, <, >=, <=, in
-    VExprSPtrs _push_down_exprs;
+    std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = 
nullptr;
 
     // for page index filter. slot id => expr
-    std::map<int, VExprSPtrs> _push_down_simple_expr;
-
-    std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = 
nullptr;
+    std::map<int, std::vector<std::unique_ptr<ColumnPredicate>>> 
_push_down_simple_predicates;
+    std::vector<std::unique_ptr<MutilColumnBlockPredicate>> 
_push_down_predicates;
+    std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
+    Arena _arena;
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index cfd556d70e5..e9f5549b510 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -98,27 +98,6 @@ void ParquetOutputStream::set_written_len(int64_t 
written_len) {
     _written_len = written_len;
 }
 
-void ParquetBuildHelper::build_schema_repetition_type(
-        parquet::Repetition::type& parquet_repetition_type,
-        const TParquetRepetitionType::type& column_repetition_type) {
-    switch (column_repetition_type) {
-    case TParquetRepetitionType::REQUIRED: {
-        parquet_repetition_type = parquet::Repetition::REQUIRED;
-        break;
-    }
-    case TParquetRepetitionType::REPEATED: {
-        parquet_repetition_type = parquet::Repetition::REPEATED;
-        break;
-    }
-    case TParquetRepetitionType::OPTIONAL: {
-        parquet_repetition_type = parquet::Repetition::OPTIONAL;
-        break;
-    }
-    default:
-        parquet_repetition_type = parquet::Repetition::UNDEFINED;
-    }
-}
-
 void ParquetBuildHelper::build_compression_type(
         parquet::WriterProperties::Builder& builder,
         const TParquetCompressionType::type& compression_type) {
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index f958b18c4c0..7ae58aff74d 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -71,17 +71,10 @@ private:
 
 class ParquetBuildHelper {
 public:
-    static void build_schema_repetition_type(
-            parquet::Repetition::type& parquet_repetition_type,
-            const TParquetRepetitionType::type& column_repetition_type);
-
-    static void build_schema_data_type(parquet::Type::type& parquet_data_type,
-                                       const TParquetDataType::type& 
column_data_type);
-
-    static void build_compression_type(parquet::WriterProperties::Builder& 
builder,
+    static void build_compression_type(::parquet::WriterProperties::Builder& 
builder,
                                        const TParquetCompressionType::type& 
compression_type);
 
-    static void build_version(parquet::WriterProperties::Builder& builder,
+    static void build_version(::parquet::WriterProperties::Builder& builder,
                               const TParquetVersion::type& parquet_version);
 };
 
@@ -124,9 +117,9 @@ private:
     arrow::Status _open_file_writer();
 
     std::shared_ptr<ParquetOutputStream> _outstream;
-    std::shared_ptr<parquet::WriterProperties> _parquet_writer_properties;
-    std::shared_ptr<parquet::ArrowWriterProperties> _arrow_properties;
-    std::unique_ptr<parquet::arrow::FileWriter> _writer;
+    std::shared_ptr<::parquet::WriterProperties> _parquet_writer_properties;
+    std::shared_ptr<::parquet::ArrowWriterProperties> _arrow_properties;
+    std::unique_ptr<::parquet::arrow::FileWriter> _writer;
     std::shared_ptr<arrow::Schema> _arrow_schema;
 
     std::vector<std::string> _column_names;
diff --git a/be/test/olap/block_column_predicate_test.cpp 
b/be/test/olap/block_column_predicate_test.cpp
index f3ed6c6d689..480606ed394 100644
--- a/be/test/olap/block_column_predicate_test.cpp
+++ b/be/test/olap/block_column_predicate_test.cpp
@@ -29,15 +29,19 @@
 #include <utility>
 #include <vector>
 
+#include "exprs/hybrid_set.h"
 #include "gtest/gtest_pred_impl.h"
 #include "olap/column_predicate.h"
 #include "olap/comparison_predicate.h"
+#include "olap/in_list_predicate.h"
+#include "olap/null_predicate.h"
 #include "olap/tablet_schema.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/type_limit.h"
 #include "vec/columns/column.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/core/field.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/runtime/timestamptz_value.h"
 
 namespace doris {
@@ -1374,4 +1378,1005 @@ TEST_F(BlockColumnPredicateTest, 
test_timestamptz_bloom_filter) {
         single_column_predicate_test_func<TYPE_TIMESTAMPTZ, 
PredicateType::EQ>(bf.get(), tz, false);
     }
 }
+TEST_F(BlockColumnPredicateTest, PARQUET_COMPARISON_PREDICATE) {
+    { // INT
+     {// EQ
+      int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::EQ>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // get stat failed
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            return false;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // NE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::NE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // GE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::GE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // LE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::LE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+} // namespace doris
+{
+    // FLOAT
+    {
+        // EQ
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::EQ>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get min max failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = nanf("");
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // NE
+        float value = 5;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::NE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // GE
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::GE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // LE
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::LE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+}
+}
+
+TEST_F(BlockColumnPredicateTest, PARQUET_IN_PREDICATE) {
+    { // INT
+        {
+            int value = 5;
+            int col_idx = 0;
+            auto hybrid_set = 
std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(false);
+            hybrid_set->insert(&value);
+            std::unique_ptr<ColumnPredicate> pred(
+                    new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST,
+                                            
HybridSet<PrimitiveType::TYPE_INT>>(col_idx,
+                                                                               
 hybrid_set));
+            SingleColumnBlockPredicate single_column_block_pred(pred.get());
+            std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                    std::make_unique<vectorized::FieldSchema>();
+            parquet_field_col1->name = "col1";
+            parquet_field_col1->data_type =
+                    vectorized::DataTypeFactory::instance().create_data_type(
+                            PrimitiveType::TYPE_INT, true);
+            parquet_field_col1->field_id = -1;
+            parquet_field_col1->parquet_schema.type = 
tparquet::Type::type::INT32;
+
+            vectorized::ParquetPredicate::ColumnStat stat;
+            cctz::time_zone tmp_ctz;
+            stat.ctz = &tmp_ctz;
+
+            std::function<bool(vectorized::ParquetPredicate::ColumnStat*, 
int)> get_stat_func;
+            {
+                // 5 belongs to [5, 5]
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    stat->col_schema = parquet_field_col1.get();
+                    stat->is_all_null = false;
+                    stat->has_null = false;
+                    auto tmp = std::string(reinterpret_cast<const 
char*>(&value), sizeof(value));
+                    stat->encoded_min_value = tmp;
+                    stat->encoded_max_value = tmp;
+                    return true;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+            {
+                // get stat failed
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    return false;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+        }
+        {
+            int value = 5;
+            int col_idx = 0;
+            auto hybrid_set = 
std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(false);
+            hybrid_set->insert(&value);
+            std::unique_ptr<ColumnPredicate> pred(
+                    new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST,
+                                            
HybridSet<PrimitiveType::TYPE_INT>>(col_idx,
+                                                                               
 hybrid_set));
+            SingleColumnBlockPredicate single_column_block_pred(pred.get());
+            std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                    std::make_unique<vectorized::FieldSchema>();
+            parquet_field_col1->name = "col1";
+            parquet_field_col1->data_type =
+                    vectorized::DataTypeFactory::instance().create_data_type(
+                            PrimitiveType::TYPE_INT, true);
+            parquet_field_col1->field_id = -1;
+            parquet_field_col1->parquet_schema.type = 
tparquet::Type::type::INT32;
+
+            vectorized::ParquetPredicate::ColumnStat stat;
+            cctz::time_zone tmp_ctz;
+            stat.ctz = &tmp_ctz;
+
+            std::function<bool(vectorized::ParquetPredicate::ColumnStat*, 
int)> get_stat_func;
+            {
+                // 5 belongs to [5, 5]
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    stat->col_schema = parquet_field_col1.get();
+                    stat->is_all_null = false;
+                    stat->has_null = false;
+                    int tmp_v = 6;
+                    auto tmp = std::string(reinterpret_cast<const 
char*>(&tmp_v), sizeof(tmp_v));
+                    stat->encoded_min_value = tmp;
+                    stat->encoded_max_value = tmp;
+                    return true;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+            }
+            {
+                // get stat failed
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    return false;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+        }
+    }
+}
+
+TEST_F(BlockColumnPredicateTest, NULL_PREDICATE) {
+    {
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(new NullPredicate(col_idx, 
true));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(new NullPredicate(col_idx, 
false));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = true;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+}
+
+TEST_F(BlockColumnPredicateTest, COMBINED_PREDICATE) {
+    {
+        AndBlockColumnPredicate and_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate));
+        and_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(and_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        AndBlockColumnPredicate and_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate2;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate2 = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_TRUE(true_predicate2->evaluate_and(&stat));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate2));
+        EXPECT_TRUE(and_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(true_predicate));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_TRUE(or_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate2;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate2 = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(false_predicate2->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate2));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(or_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(or_block_column_pred.evaluate_and(&stat));
+    }
+}
+
 } // namespace doris
diff --git a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
index 9ae3f05ceb8..53382889e44 100644
--- a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
@@ -411,7 +411,7 @@ TEST_F(ParquetExprTest, test_ne) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_FALSE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_FALSE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_eq) {
@@ -431,7 +431,7 @@ TEST_F(ParquetExprTest, test_eq) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_le) {
@@ -451,7 +451,7 @@ TEST_F(ParquetExprTest, test_le) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_ge) {
@@ -471,7 +471,7 @@ TEST_F(ParquetExprTest, test_ge) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_gt) {
@@ -491,7 +491,7 @@ TEST_F(ParquetExprTest, test_gt) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_lt) {
@@ -511,7 +511,7 @@ TEST_F(ParquetExprTest, test_lt) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001   [10000000000 
, 10000000000+3)
@@ -532,7 +532,7 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -547,7 +547,6 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -563,7 +562,6 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -585,7 +583,7 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -596,7 +594,6 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
             stat->has_null = false;
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -608,7 +605,6 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
             stat->has_null = false;
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -630,7 +626,7 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -645,7 +641,6 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -661,7 +656,6 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -682,7 +676,7 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -697,7 +691,6 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -713,7 +706,6 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -734,7 +726,7 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -749,7 +741,6 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -765,7 +756,6 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -999,10 +989,6 @@ TEST_F(ParquetExprTest, test_in) {
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr2, get_stat_func));
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr3, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr4, get_stat_func));
     }
 
     {
@@ -1018,10 +1004,6 @@ TEST_F(ParquetExprTest, test_in) {
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr, get_stat_func));
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr2, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr3, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr4, get_stat_func));
     }
 }
 
@@ -1054,8 +1036,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_le_int64) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_gt_float) {
@@ -1087,8 +1067,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_gt_float) {
         }
         return true;
     };
-
-    ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_ge_double) {
@@ -1120,8 +1098,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_ge_double) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_lt_string) {
@@ -1153,8 +1129,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_lt_string) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_eq_bool) {
@@ -1185,8 +1159,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_eq_bool) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_and) {
@@ -1265,12 +1237,17 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
         }
         return true;
     };
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(and_expr));
-    ASSERT_FALSE(p_reader->_expr_push_down(and_expr, get_stat_func));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(and_expr));
 
     p_reader->_enable_filter_by_min_max = true;
-    p_reader->_push_down_exprs.clear();
-    p_reader->_push_down_exprs.push_back(and_expr);
+    p_reader->_push_down_simple_predicates.clear();
+    p_reader->_push_down_simple_predicates.emplace(
+            2, std::vector<std::unique_ptr<ColumnPredicate>> {});
+    
p_reader->_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
+    ASSERT_TRUE(p_reader->convert_predicates({and_expr}, 
p_reader->_push_down_simple_predicates[2],
+                                             
p_reader->_push_down_predicates.back(),
+                                             p_reader->_arena)
+                        .ok());
 
     bool filter_group = false;
     
ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[0], 
&filter_group)
@@ -1280,7 +1257,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
     
ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[1], 
&filter_group)
                         .OK());
     ASSERT_TRUE(filter_group);
-    p_reader->_push_down_exprs.clear();
+    p_reader->_push_down_simple_predicates.clear();
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
@@ -1337,9 +1314,8 @@ TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
         }
         return true;
     };
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(or_expr));
-    ASSERT_FALSE(p_reader->_expr_push_down(or_expr, get_stat_func));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(or_expr));
 }
 
 } // namespace vectorized
-} // namespace doris
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to