This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 71486a2278a [refactor](parquet) Use column predicates to do filtering 
(#56904)
71486a2278a is described below

commit 71486a2278add78b5b6fdc57811c23351c8e04b0
Author: Gabriel <[email protected]>
AuthorDate: Mon Oct 27 17:38:39 2025 +0800

    [refactor](parquet) Use column predicates to do filtering (#56904)
---
 be/src/olap/block_column_predicate.h               |   29 +
 be/src/olap/column_predicate.h                     |   10 +
 be/src/olap/comparison_predicate.h                 |   47 +
 be/src/olap/in_list_predicate.h                    |   36 +
 be/src/olap/null_predicate.h                       |   11 +
 be/src/vec/core/field.cpp                          |   71 ++
 be/src/vec/core/field.h                            |    1 +
 be/src/vec/exec/format/generic_reader.cpp          |  252 +++++
 be/src/vec/exec/format/generic_reader.h            |   26 +
 be/src/vec/exec/format/parquet/parquet_pred_cmp.h  |   64 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  269 ++----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   21 +-
 be/src/vec/runtime/vparquet_transformer.cpp        |   21 -
 be/src/vec/runtime/vparquet_transformer.h          |   17 +-
 be/test/olap/block_column_predicate_test.cpp       | 1006 ++++++++++++++++++++
 .../vec/exec/format/parquet/parquet_expr_test.cpp  |   69 +-
 16 files changed, 1587 insertions(+), 363 deletions(-)

diff --git a/be/src/olap/block_column_predicate.h 
b/be/src/olap/block_column_predicate.h
index 1c4507f1496..a8728a768fb 100644
--- a/be/src/olap/block_column_predicate.h
+++ b/be/src/olap/block_column_predicate.h
@@ -33,6 +33,7 @@
 #include "olap/column_predicate.h"
 #include "olap/olap_common.h"
 #include "vec/columns/column.h"
+#include "vec/exec/format/parquet/parquet_pred_cmp.h"
 
 namespace roaring {
 class Roaring;
@@ -79,6 +80,10 @@ public:
         throw Exception(Status::FatalError("should not reach here"));
     }
 
+    virtual bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* 
statistic) const {
+        throw Exception(Status::FatalError("should not reach here"));
+    }
+
     virtual bool evaluate_and(const segment_v2::BloomFilter* bf) const {
         throw Exception(Status::FatalError("should not reach here"));
     }
@@ -117,6 +122,9 @@ public:
                       bool* flags) const override;
     bool support_zonemap() const override { return 
_predicate->support_zonemap(); }
     bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override;
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        return _predicate->evaluate_and(statistic);
+    }
     bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
     bool evaluate_and(const StringRef* dict_words, const size_t dict_num) 
const override;
     void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, 
uint16_t selected_size,
@@ -180,6 +188,18 @@ public:
                       bool* flags) const override;
     void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, 
uint16_t selected_size,
                      bool* flags) const override;
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (num_of_column_predicate() == 1) {
+            return _block_column_predicate_vec[0]->evaluate_and(statistic);
+        } else {
+            for (int i = 0; i < num_of_column_predicate(); ++i) {
+                if (_block_column_predicate_vec[i]->evaluate_and(statistic)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
 
     // note(wb) we didnt't implement evaluate_vec method here, because storage 
layer only support AND predicate now;
 };
@@ -203,6 +223,15 @@ public:
 
     bool evaluate_and(const StringRef* dict_words, const size_t dict_num) 
const override;
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        for (auto& block_column_predicate : _block_column_predicate_vec) {
+            if (!block_column_predicate->evaluate_and(statistic)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     bool can_do_bloom_filter(bool ngram) const override {
         for (auto& pred : _block_column_predicate_vec) {
             if (!pred->can_do_bloom_filter(ngram)) {
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 372d71e7a81..a9060a99eb2 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -27,6 +27,7 @@
 #include "runtime/define_primitive_type.h"
 #include "util/runtime_profile.h"
 #include "vec/columns/column.h"
+#include "vec/exec/format/parquet/parquet_pred_cmp.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 
 using namespace doris::segment_v2;
@@ -223,6 +224,15 @@ public:
 
     virtual bool can_do_bloom_filter(bool ngram) const { return false; }
 
+    /**
+     * Figure out whether this page is matched partially or completely.
+     */
+    virtual bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* 
statistic) const {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "ParquetPredicate is not supported by this 
predicate!");
+        return true;
+    }
+
     // used to evaluate pre read column in lazy materialization
     // now only support integer/float
     // a vectorized eval way
diff --git a/be/src/olap/comparison_predicate.h 
b/be/src/olap/comparison_predicate.h
index ba3df21368f..600be2e60dd 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -167,6 +167,53 @@ public:
         }
     }
 
+    /**
+     * To figure out whether this page is matched partially or completely.
+     *
+     * 1. EQ: if `_value` belongs to the interval [min, max], return true to 
further compute each value in this page.
+     * 2. NE: return true to further compute each value in this page if some 
values not equal to `_value`.
+     * 3. LT|LE: if `_value` is greater than min, return true to further 
compute each value in this page.
+     * 4. GT|GE: if `_value` is less than max, return true to further compute 
each value in this page.
+     */
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        vectorized::Field min_field;
+        vectorized::Field max_field;
+        if (!vectorized::ParquetPredicate::get_min_max_value(
+                     statistic->col_schema, statistic->encoded_min_value,
+                     statistic->encoded_max_value, *statistic->ctz, 
&min_field, &max_field)
+                     .ok()) {
+            return true;
+        };
+        T min_value;
+        T max_value;
+        if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
+            min_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)min_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+            max_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)max_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+        } else {
+            min_value = min_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+            max_value = max_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+        }
+
+        if constexpr (PT == PredicateType::EQ) {
+            return Compare::less_equal(min_value, _value) &&
+                   Compare::greater_equal(max_value, _value);
+        } else if constexpr (PT == PredicateType::NE) {
+            return !Compare::equal(min_value, _value) || 
!Compare::equal(max_value, _value);
+        } else if constexpr (PT == PredicateType::LT || PT == 
PredicateType::LE) {
+            return Compare::less_equal(min_value, _value);
+        } else {
+            static_assert(PT == PredicateType::GT || PT == PredicateType::GE);
+            return Compare::greater_equal(max_value, _value);
+        }
+    }
+
     bool is_always_true(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override {
         if (statistic.first->is_null() || statistic.second->is_null()) {
             return false;
diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h
index 987084265be..f326098185d 100644
--- a/be/src/olap/in_list_predicate.h
+++ b/be/src/olap/in_list_predicate.h
@@ -287,6 +287,42 @@ public:
         }
     }
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        vectorized::Field min_field;
+        vectorized::Field max_field;
+        if (!vectorized::ParquetPredicate::get_min_max_value(
+                     statistic->col_schema, statistic->encoded_min_value,
+                     statistic->encoded_max_value, *statistic->ctz, 
&min_field, &max_field)
+                     .ok()) {
+            return true;
+        };
+        T min_value;
+        T max_value;
+        if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
+            min_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)min_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+            max_value =
+                    (typename PrimitiveTypeTraits<Type>::CppType)max_field
+                            .template get<typename 
PrimitiveTypeTraits<Type>::NearestFieldType>();
+        } else {
+            min_value = min_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+            max_value = max_field.template get<typename 
PrimitiveTypeTraits<Type>::CppType>();
+        }
+
+        if constexpr (PT == PredicateType::IN_LIST) {
+            return (Compare::less_equal(min_value, _max_value) &&
+                    Compare::greater_equal(max_value, _min_value)) ||
+                   (Compare::greater_equal(max_value, _min_value) &&
+                    Compare::less_equal(min_value, _max_value));
+        } else {
+            return true;
+        }
+    }
+
     bool evaluate_and(const StringRef* dict_words, const size_t count) const 
override {
         for (size_t i = 0; i != count; ++i) {
             const auto found = _values->find(dict_words[i].data, 
dict_words[i].size) ^ _opposite;
diff --git a/be/src/olap/null_predicate.h b/be/src/olap/null_predicate.h
index ffd05af86a1..af4dc366d25 100644
--- a/be/src/olap/null_predicate.h
+++ b/be/src/olap/null_predicate.h
@@ -70,6 +70,17 @@ public:
         }
     }
 
+    bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) 
const override {
+        if (!(*statistic->get_stat_func)(statistic, column_id())) {
+            return true;
+        }
+        if (_is_null) {
+            return true;
+        } else {
+            return !statistic->is_all_null;
+        }
+    }
+
     bool evaluate_del(const std::pair<WrapperField*, WrapperField*>& 
statistic) const override {
         // evaluate_del only use for delete condition to filter page, need use 
delete condition origin value,
         // when opposite==true, origin value 'is null'->'is not null' and 'is 
not null'->'is null',
diff --git a/be/src/vec/core/field.cpp b/be/src/vec/core/field.cpp
index 6fb0af9cec3..0ea3a686758 100644
--- a/be/src/vec/core/field.cpp
+++ b/be/src/vec/core/field.cpp
@@ -29,6 +29,7 @@
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/io/io_helper.h"
 #include "vec/io/var_int.h"
+#include "vec/runtime/vdatetime_value.h"
 
 namespace doris::vectorized {
 class BufferReadable;
@@ -764,6 +765,76 @@ std::string_view Field::as_string_view() const {
 
 #undef MATCH_PRIMITIVE_TYPE
 
+#define MATCH_PRIMITIVE_TYPE(primite_type)                                     
              \
+    if (type == primite_type) {                                                
              \
+        const auto& v = get<typename 
PrimitiveTypeTraits<primite_type>::NearestFieldType>(); \
+        return std::to_string(v);                                              
              \
+    }
+
+std::string Field::to_string() const {
+    if (type == PrimitiveType::TYPE_STRING || type == 
PrimitiveType::TYPE_VARCHAR ||
+        type == PrimitiveType::TYPE_CHAR) {
+        const auto& s = get<String>();
+        return {s.data(), s.size()};
+    }
+    if (type == TYPE_DECIMAL32) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL32>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL64) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL64>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMALV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMALV2>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL128I) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL128I>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_DECIMAL256) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DECIMAL256>::NearestFieldType>();
+        return v.get_value().to_string(v.get_scale());
+    }
+    if (type == TYPE_LARGEINT) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_LARGEINT>::NearestFieldType>();
+        return int128_to_string(v);
+    }
+    if (type == TYPE_DATE || type == TYPE_DATETIME) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATE>::NearestFieldType>();
+        std::string buf(40, 0);
+        auto* to = binary_cast<int64_t, 
doris::VecDateTimeValue>(v).to_string(buf.data());
+        buf.resize(to - buf.data() - 1);
+        return buf;
+    }
+    if (type == TYPE_DATEV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATEV2>::NearestFieldType>();
+        return binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>((uint32_t)v).to_string();
+    }
+    if (type == TYPE_DATETIMEV2) {
+        const auto& v = get<typename 
PrimitiveTypeTraits<TYPE_DATETIMEV2>::NearestFieldType>();
+        return binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(v).to_string();
+    }
+    MATCH_PRIMITIVE_TYPE(TYPE_BOOLEAN);
+    MATCH_PRIMITIVE_TYPE(TYPE_TINYINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_SMALLINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_INT);
+    MATCH_PRIMITIVE_TYPE(TYPE_BIGINT);
+    MATCH_PRIMITIVE_TYPE(TYPE_FLOAT);
+    MATCH_PRIMITIVE_TYPE(TYPE_DOUBLE);
+    MATCH_PRIMITIVE_TYPE(TYPE_TIME);
+    MATCH_PRIMITIVE_TYPE(TYPE_TIMEV2);
+    //    MATCH_PRIMITIVE_TYPE(TYPE_IPV4);
+    //    MATCH_PRIMITIVE_TYPE(TYPE_IPV6);
+    MATCH_PRIMITIVE_TYPE(TYPE_UINT32);
+    MATCH_PRIMITIVE_TYPE(TYPE_UINT64);
+    throw Exception(
+            Status::FatalError("type not supported for to_string, type={}", 
get_type_name()));
+}
+
+#undef MATCH_PRIMITIVE_TYPE
+
 #define DECLARE_FUNCTION(FUNC_NAME)                                            
              \
     template void Field::FUNC_NAME<TYPE_NULL>(                                 
              \
             typename PrimitiveTypeTraits<TYPE_NULL>::NearestFieldType && rhs); 
              \
diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h
index 1131ecfc4f7..60919c1277e 100644
--- a/be/src/vec/core/field.h
+++ b/be/src/vec/core/field.h
@@ -484,6 +484,7 @@ public:
     }
 
     std::string_view as_string_view() const;
+    std::string to_string() const;
 
 private:
     std::aligned_union_t<
diff --git a/be/src/vec/exec/format/generic_reader.cpp 
b/be/src/vec/exec/format/generic_reader.cpp
new file mode 100644
index 00000000000..8b3339faede
--- /dev/null
+++ b/be/src/vec/exec/format/generic_reader.cpp
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/generic_reader.h"
+
+#include "olap/predicate_creator.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/exprs/vtopn_pred.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+Status ExprPushDownHelper::_extract_predicates(const VExprSPtr& expr, int& cid,
+                                               DataTypePtr& data_type, 
std::vector<Field>& values,
+                                               bool null_pred, bool& parsed) 
const {
+    parsed = false;
+    values.clear();
+    if (!expr->children()[0]->is_slot_ref()) [[unlikely]] {
+        return Status::OK();
+    }
+    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
+    cid = slot_ref->column_id();
+    values.reserve(expr->children().size() - 1);
+    data_type = remove_nullable(slot_ref->data_type());
+    if (null_pred) {
+        DCHECK_EQ(expr->children().size(), 1);
+        parsed = true;
+    }
+    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
+        auto child_expr = expr->children()[child_id];
+        if (!child_expr->is_literal()) {
+            return Status::OK();
+        }
+        const auto* literal = static_cast<const VLiteral*>(child_expr.get());
+        if (literal->get_column_ptr()->is_null_at(0)) {
+            continue;
+        }
+        values.emplace_back(literal->get_column_ptr()->operator[](0));
+        parsed = true;
+    }
+    return Status::OK();
+}
+
+Status ExprPushDownHelper::convert_predicates(
+        const VExprSPtrs& exprs, 
std::vector<std::unique_ptr<ColumnPredicate>>& predicates,
+        std::unique_ptr<MutilColumnBlockPredicate>& root, Arena& arena) {
+    if (exprs.empty()) {
+        return Status::OK();
+    }
+
+    int cid;
+    DataTypePtr data_type;
+    std::vector<Field> values;
+    bool parsed = false;
+    for (const auto& expr : exprs) {
+        cid = -1;
+        values.clear();
+        parsed = false;
+        switch (expr->node_type()) {
+        case TExprNodeType::BINARY_PRED: {
+            decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* 
create = nullptr;
+            if (expr->op() == TExprOpcode::EQ) {
+                create = create_comparison_predicate<PredicateType::EQ>;
+            } else if (expr->op() == TExprOpcode::NE) {
+                create = create_comparison_predicate<PredicateType::NE>;
+            } else if (expr->op() == TExprOpcode::LT) {
+                create = create_comparison_predicate<PredicateType::LT>;
+            } else if (expr->op() == TExprOpcode::LE) {
+                create = create_comparison_predicate<PredicateType::LE>;
+            } else if (expr->op() == TExprOpcode::GT) {
+                create = create_comparison_predicate<PredicateType::GT>;
+            } else if (expr->op() == TExprOpcode::GE) {
+                create = create_comparison_predicate<PredicateType::GE>;
+            } else {
+                break;
+            }
+            RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, values, 
false, parsed));
+            if (parsed) {
+                // TODO(gabriel): Use string view
+                predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                        create(data_type, cid, values[0].to_string(), false, 
arena)));
+                root->add_column_predicate(
+                        
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+            }
+            break;
+        }
+        case TExprNodeType::IN_PRED: {
+            switch (expr->op()) {
+            case TExprOpcode::FILTER_IN: {
+                RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, 
values, false, parsed));
+                if (parsed) {
+                    // TODO(gabriel): Use string view
+                    std::vector<std::string> conditions(values.size());
+                    for (size_t i = 0; i < conditions.size(); i++) {
+                        conditions[i] = values[i].to_string();
+                    }
+                    predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                            create_list_predicate<PredicateType::IN_LIST>(
+                                    data_type, cid, conditions, false, 
arena)));
+                    root->add_column_predicate(
+                            
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+                }
+                break;
+            }
+            default: {
+                break;
+            }
+            }
+            break;
+        }
+        case TExprNodeType::COMPOUND_PRED: {
+            switch (expr->op()) {
+            case TExprOpcode::COMPOUND_AND: {
+                for (const auto& child : expr->children()) {
+                    RETURN_IF_ERROR(convert_predicates({child}, predicates, 
root, arena));
+                }
+                break;
+            }
+            case TExprOpcode::COMPOUND_OR: {
+                std::unique_ptr<MutilColumnBlockPredicate> new_root =
+                        OrBlockColumnPredicate::create_unique();
+                for (const auto& child : expr->children()) {
+                    RETURN_IF_ERROR(convert_predicates({child}, predicates, 
new_root, arena));
+                }
+                root->add_column_predicate(std::move(new_root));
+                break;
+            }
+            default: {
+                break;
+            }
+            }
+            break;
+        }
+        case TExprNodeType::FUNCTION_CALL: {
+            auto fn_name = expr->fn().name.function_name;
+            // only support `is null` and `is not null`
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, 
values, true, parsed));
+                if (parsed) {
+                    predicates.push_back(std::unique_ptr<ColumnPredicate>(
+                            new NullPredicate(cid, true, fn_name == 
"is_not_null_pred")));
+                    root->add_column_predicate(
+                            
SingleColumnBlockPredicate::create_unique(predicates.back().get()));
+                }
+            }
+            break;
+        }
+        default:
+            break;
+        }
+    }
+
+    return Status::OK();
+}
+
+bool ExprPushDownHelper::check_expr_can_push_down(const VExprSPtr& expr) const 
{
+    if (expr == nullptr) {
+        return false;
+    }
+
+    switch (expr->node_type()) {
+    case TExprNodeType::BINARY_PRED:
+    case TExprNodeType::IN_PRED: {
+        switch (expr->op()) {
+        case TExprOpcode::GE:
+        case TExprOpcode::GT:
+        case TExprOpcode::LE:
+        case TExprOpcode::LT:
+        case TExprOpcode::EQ:
+        case TExprOpcode::FILTER_IN:
+            return _check_slot_can_push_down(expr) && 
_check_other_children_is_literal(expr);
+        default: {
+            return false;
+        }
+        }
+    }
+    case TExprNodeType::COMPOUND_PRED: {
+        switch (expr->op()) {
+        case TExprOpcode::COMPOUND_AND: {
+            // at least one child can be pushed down
+            return std::ranges::any_of(expr->children(), [this](const auto& 
child) {
+                return check_expr_can_push_down(child);
+            });
+        }
+        case TExprOpcode::COMPOUND_OR: {
+            // all children must be pushed down
+            return std::ranges::all_of(expr->children(), [this](const auto& 
child) {
+                return check_expr_can_push_down(child);
+            });
+        }
+        default: {
+            return false;
+        }
+        }
+    }
+    case TExprNodeType::FUNCTION_CALL: {
+        auto fn_name = expr->fn().name.function_name;
+        // only support `is null` and `is not null`
+        if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+            return _check_slot_can_push_down(expr);
+        }
+        return false;
+    }
+    default: {
+        return false;
+    }
+    }
+}
+
+bool ExprPushDownHelper::_check_slot_can_push_down(const VExprSPtr& expr) 
const {
+    if (!expr->children()[0]->is_slot_ref()) {
+        return false;
+    }
+
+    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
+    // check if the slot exists in parquet file.
+    if (!_exists_in_file(slot_ref)) {
+        return false;
+    }
+    return _type_matches(slot_ref);
+}
+
+bool ExprPushDownHelper::_check_other_children_is_literal(const VExprSPtr& 
expr) const {
+    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
+        auto child_expr = expr->children()[child_id];
+        if (!child_expr->is_literal()) {
+            return false;
+        }
+    }
+    return true;
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 6c1c12781d2..b21971b7a3f 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -20,15 +20,21 @@
 #include <gen_cpp/PlanNodes_types.h>
 
 #include "common/status.h"
+#include "olap/block_column_predicate.h"
 #include "runtime/descriptors.h"
 #include "runtime/types.h"
 #include "util/profile_collector.h"
 #include "vec/exprs/vexpr_fwd.h"
 
+namespace doris {
+class ColumnPredicate;
+} // namespace doris
+
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
 class Block;
+class VSlotRef;
 // This a reader interface for all file readers.
 // A GenericReader is responsible for reading a file and return
 // a set of blocks with specified schema,
@@ -106,5 +112,25 @@ protected:
     FileMetaCache* _meta_cache = nullptr;
 };
 
+class ExprPushDownHelper {
+public:
+    ExprPushDownHelper() = default;
+    virtual ~ExprPushDownHelper() = default;
+    bool check_expr_can_push_down(const VExprSPtr& expr) const;
+    Status convert_predicates(const VExprSPtrs& exprs,
+                              std::vector<std::unique_ptr<ColumnPredicate>>& 
predicates,
+                              std::unique_ptr<MutilColumnBlockPredicate>& 
root, Arena& arena);
+
+protected:
+    virtual bool _exists_in_file(const VSlotRef*) const = 0;
+    virtual bool _type_matches(const VSlotRef*) const = 0;
+
+private:
+    bool _check_slot_can_push_down(const VExprSPtr& expr) const;
+    bool _check_other_children_is_literal(const VExprSPtr& expr) const;
+    Status _extract_predicates(const VExprSPtr& expr, int& cid, DataTypePtr& 
data_type,
+                               std::vector<Field>& values, bool null_pred, 
bool& parsed) const;
+};
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h 
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index 3126616d3e3..8f27b675207 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -156,17 +156,9 @@ public:
         std::string encoded_max_value;
         bool has_null;
         bool is_all_null;
-    };
-
-    enum OP {
-        EQ,
-        LT,
-        LE,
-        GT,
-        GE,
-        IS_NULL,
-        IS_NOT_NULL,
-        IN,
+        const FieldSchema* col_schema;
+        const cctz::time_zone* ctz;
+        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* 
get_stat_func;
     };
 
     static Status get_min_max_value(const FieldSchema* col_schema, const 
std::string& encoded_min,
@@ -395,56 +387,6 @@ public:
 
         return Status::OK();
     }
-
-    static bool check_can_filter(OP op, const std::vector<Field>& 
literal_values,
-                                 const ColumnStat& column_stat, const 
FieldSchema* col_schema,
-                                 const cctz::time_zone* ctz) {
-        Field min_field;
-        Field max_field;
-        if (!ParquetPredicate::get_min_max_value(col_schema, 
column_stat.encoded_min_value,
-                                                 
column_stat.encoded_max_value, *ctz, &min_field,
-                                                 &max_field)) {
-            return false;
-        };
-
-        switch (op) {
-        case ParquetPredicate::OP::EQ:
-        case ParquetPredicate::OP::IN: {
-            for (const auto& in_value : literal_values) {
-                if (in_value.is_null() && column_stat.has_null) {
-                    return false;
-                }
-                if (min_field <= in_value && in_value <= max_field) {
-                    return false;
-                }
-            }
-            return true;
-        }
-        case ParquetPredicate::OP::LT: {
-            DCHECK(!literal_values[0].is_null());
-            return min_field >= literal_values[0];
-        }
-        case ParquetPredicate::OP::LE: {
-            DCHECK(!literal_values[0].is_null());
-            return min_field > literal_values[0];
-        }
-        case ParquetPredicate::OP::GT: {
-            DCHECK(!literal_values[0].is_null());
-            return max_field <= literal_values[0];
-        }
-        case ParquetPredicate::OP::GE: {
-            DCHECK(!literal_values[0].is_null());
-            return max_field < literal_values[0];
-        }
-        case ParquetPredicate::OP::IS_NULL: {
-            return !column_stat.has_null;
-        }
-        case ParquetPredicate::OP::IS_NOT_NULL: {
-            return column_stat.is_all_null;
-        }
-        }
-        return false;
-    }
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 9a4ac82c5d2..71650932847 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -365,71 +365,11 @@ Status ParquetReader::init_reader(
     return Status::OK();
 }
 
-bool ParquetReader::_check_expr_can_push_down(const VExprSPtr& expr) {
-    if (expr == nullptr) {
-        return false;
-    }
-
-    switch (expr->node_type()) {
-    case TExprNodeType::BINARY_PRED:
-    case TExprNodeType::IN_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::GE:
-        case TExprOpcode::GT:
-        case TExprOpcode::LE:
-        case TExprOpcode::LT:
-        case TExprOpcode::EQ:
-        case TExprOpcode::FILTER_IN:
-            return _check_slot_can_push_down(expr) && 
_check_other_children_is_literal(expr);
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::COMPOUND_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::COMPOUND_AND: {
-            // at least one child can be pushed down
-            return std::ranges::any_of(expr->children(), [this](const auto& 
child) {
-                return _check_expr_can_push_down(child);
-            });
-        }
-        case TExprOpcode::COMPOUND_OR: {
-            // all children must be pushed down
-            return std::ranges::all_of(expr->children(), [this](const auto& 
child) {
-                return _check_expr_can_push_down(child);
-            });
-        }
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::FUNCTION_CALL: {
-        auto fn_name = expr->fn().name.function_name;
-        // only support `is null` and `is not null`
-        if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
-            return _check_slot_can_push_down(expr);
-        }
-        return false;
-    }
-    default: {
-        return false;
-    }
-    }
+bool ParquetReader::_exists_in_file(const VSlotRef* slot_ref) const {
+    return _table_info_node_ptr->children_column_exists(slot_ref->expr_name());
 }
 
-bool ParquetReader::_check_slot_can_push_down(const VExprSPtr& expr) {
-    if (!expr->children()[0]->is_slot_ref()) {
-        return false;
-    }
-
-    const auto* slot_ref = assert_cast<const 
VSlotRef*>(expr->children()[0].get());
-    // check if the slot exists in parquet file.
-    if (!_table_info_node_ptr->children_column_exists(slot_ref->expr_name())) {
-        return false;
-    }
-
+bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
     auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
     auto table_col_type = remove_nullable(slot->type());
 
@@ -437,126 +377,8 @@ bool ParquetReader::_check_slot_can_push_down(const 
VExprSPtr& expr) {
     const auto& file_col_type =
             
remove_nullable(_file_metadata->schema().get_column(file_col_name)->data_type);
 
-    // If a schema change occurs, the min and max values of the parquet file 
cannot be guaranteed to be valid for the current table.
-    if (table_col_type->get_primitive_type() != 
file_col_type->get_primitive_type() ||
-        is_complex_type(table_col_type->get_primitive_type())) {
-        return false;
-    }
-
-    return true;
-}
-
-bool ParquetReader::_check_other_children_is_literal(const VExprSPtr& expr) {
-    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
-        auto child_expr = expr->children()[child_id];
-        if (!child_expr->is_literal()) {
-            return false;
-        }
-    }
-    return true;
-}
-
-// Although we have already checked whether the children of expr are slots and 
literals in `_check_expr_can_push_down`, we still need to check again here
-// because the existence of the AND predicate will cause only some children to 
be checked.
-bool ParquetReader::_simple_expr_push_down(
-        const VExprSPtr& expr, ParquetPredicate::OP op,
-        const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                get_stat_func) {
-    if (!expr->children()[0]->is_slot_ref()) [[unlikely]] {
-        return false;
-    }
-    const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
-
-    std::vector<Field> literal_values(expr->children().size() - 1);
-    for (size_t child_id = 1; child_id < expr->children().size(); child_id++) {
-        auto child_expr = expr->children()[child_id];
-        if (!child_expr->is_literal()) {
-            return false;
-        }
-        const auto* literal = static_cast<const VLiteral*>(child_expr.get());
-        if (literal->get_column_ptr()->is_null_at(0)) {
-            continue;
-        }
-        literal_values[child_id - 1] = 
literal->get_column_ptr()->operator[](0);
-    }
-
-    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
-    if (!_table_info_node_ptr->children_column_exists(slot->col_name())) {
-        return false;
-    }
-
-    const auto& file_col_name = 
_table_info_node_ptr->children_file_column_name(slot->col_name());
-    const FieldSchema* col_schema = 
_file_metadata->schema().get_column(file_col_name);
-
-    ParquetPredicate::ColumnStat column_stat;
-    if (!get_stat_func(col_schema, &column_stat)) {
-        return false;
-    }
-
-    return ParquetPredicate::check_can_filter(op, literal_values, column_stat, 
col_schema, _ctz);
-}
-
-bool ParquetReader::_expr_push_down(
-        const VExprSPtr& expr,
-        const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                get_stat_func) {
-    if (expr == nullptr) {
-        return false;
-    }
-
-    switch (expr->node_type()) {
-    case TExprNodeType::BINARY_PRED:
-    case TExprNodeType::IN_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::GE:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::GE, 
get_stat_func);
-        case TExprOpcode::GT:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::GT, 
get_stat_func);
-        case TExprOpcode::LE:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::LE, 
get_stat_func);
-        case TExprOpcode::LT:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::LT, 
get_stat_func);
-        case TExprOpcode::EQ:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::EQ, 
get_stat_func);
-        case TExprOpcode::FILTER_IN:
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::IN, 
get_stat_func);
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::COMPOUND_PRED: {
-        switch (expr->op()) {
-        case TExprOpcode::COMPOUND_AND: {
-            return std::ranges::any_of(expr->children(), [&](const auto& 
child) {
-                return _expr_push_down(child, get_stat_func);
-            });
-        }
-        case TExprOpcode::COMPOUND_OR: {
-            return std::ranges::all_of(expr->children(), [&](const auto& 
child) {
-                return _expr_push_down(child, get_stat_func);
-            });
-        }
-        default: {
-            return false;
-        }
-        }
-    }
-    case TExprNodeType::FUNCTION_CALL: {
-        auto fn_name = expr->fn().name.function_name;
-        // only support `is null` and `is not null`
-        if (fn_name == "is_null_pred") {
-            return _simple_expr_push_down(expr, ParquetPredicate::OP::IS_NULL, 
get_stat_func);
-        } else if (fn_name == "is_not_null_pred") {
-            return _simple_expr_push_down(expr, 
ParquetPredicate::OP::IS_NOT_NULL, get_stat_func);
-        }
-        return false;
-    }
-    default: {
-        return false;
-    }
-    }
-    return false;
+    return (table_col_type->get_primitive_type() == 
file_col_type->get_primitive_type()) &&
+           !is_complex_type(table_col_type->get_primitive_type());
 }
 
 Status ParquetReader::set_fill_columns(
@@ -637,15 +459,21 @@ Status ParquetReader::set_fill_columns(
             visit_slot(expr.get());
         }
 
-        if (_check_expr_can_push_down(expr)) {
-            _push_down_exprs.emplace_back(expr);
+        if (check_expr_can_push_down(expr)) {
+            
_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
             if (expr->node_type() != TExprNodeType::COMPOUND_PRED) {
                 // for page index filter.
                 VSlotRef* slot_ref = 
static_cast<VSlotRef*>(expr->children()[0].get());
-                if (!_push_down_simple_expr.contains(slot_ref->slot_id())) {
-                    _push_down_simple_expr.emplace(slot_ref->slot_id(), 
VExprSPtrs {});
+                if 
(!_push_down_simple_predicates.contains(slot_ref->slot_id())) {
+                    _push_down_simple_predicates.emplace(
+                            slot_ref->slot_id(), 
std::vector<std::unique_ptr<ColumnPredicate>> {});
                 }
-                _push_down_simple_expr[slot_ref->slot_id()].emplace_back(expr);
+                RETURN_IF_ERROR(convert_predicates(
+                        {expr}, 
_push_down_simple_predicates[slot_ref->slot_id()],
+                        _push_down_predicates.back(), _arena));
+            } else {
+                RETURN_IF_ERROR(convert_predicates({expr}, _useless_predicates,
+                                                   
_push_down_predicates.back(), _arena));
             }
         }
     }
@@ -1089,7 +917,6 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
             continue;
         }
         auto& chunk = row_group.columns[parquet_col_id];
-
         if (chunk.offset_index_length == 0) {
             continue;
         }
@@ -1097,10 +924,11 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         RETURN_IF_ERROR(page_index.parse_offset_index(chunk, 
off_index_buff.data(), &offset_index));
         _col_offsets[parquet_col_id] = offset_index;
 
-        if (!_push_down_simple_expr.contains(slot_id)) {
+        if (!_push_down_simple_predicates.contains(slot_id) ||
+            _push_down_simple_predicates[slot_id].empty()) {
             continue;
         }
-        const auto& push_down_expr = _push_down_simple_expr[slot_id];
+        const auto& predicates = _push_down_simple_predicates[slot_id];
 
         if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
             continue;
@@ -1118,12 +946,21 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size());
 
         for (int page_id = 0; page_id < num_of_pages; page_id++) {
-            std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)> get_stat_func =
-                    [&](const FieldSchema* col_schema, 
ParquetPredicate::ColumnStat* stat) {
+            std::function<bool(ParquetPredicate::ColumnStat*, int)> 
get_stat_func =
+                    [&](ParquetPredicate::ColumnStat* stat, const int cid) {
+                        auto* slot = _tuple_descriptor->slots()[cid];
+                        if 
(!_table_info_node_ptr->children_column_exists(slot->col_name())) {
+                            return false;
+                        }
                         if (!column_index.__isset.null_counts) {
                             return false;
                         }
 
+                        const auto& file_col_name =
+                                
_table_info_node_ptr->children_file_column_name(slot->col_name());
+                        const FieldSchema* col_schema =
+                                
_file_metadata->schema().get_column(file_col_name);
+                        stat->col_schema = col_schema;
                         stat->is_all_null = column_index.null_pages[page_id];
                         stat->has_null = column_index.null_counts[page_id] > 0;
                         stat->encoded_min_value = encoded_min_vals[page_id];
@@ -1131,9 +968,13 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
                         return true;
                     };
 
-            for (const auto& expr : push_down_expr) {
-                if (_expr_push_down(expr, get_stat_func)) {
+            ParquetPredicate::ColumnStat stat;
+            stat.ctz = _ctz;
+            stat.get_stat_func = &get_stat_func;
+            for (const auto& predicate : predicates) {
+                if (!predicate->evaluate_and(&stat)) {
                     skipped_page_range.emplace_back(page_id);
+                    break;
                 }
             }
         }
@@ -1223,20 +1064,32 @@ Status ParquetReader::_process_column_stat_filter(const 
tparquet::RowGroup& row_
         return Status::OK();
     }
 
-    std::function<bool(const FieldSchema*, ParquetPredicate::ColumnStat*)> 
get_stat_func =
-            [&](const FieldSchema* col_schema, ParquetPredicate::ColumnStat* 
stat) {
-                int parquet_col_id = col_schema->physical_column_index;
-                auto meta_data = row_group.columns[parquet_col_id].meta_data;
-
-                return ParquetPredicate::read_column_stats(col_schema, 
meta_data, &_ignored_stats,
-                                                           
_t_metadata->created_by, stat)
-                        .ok();
-            };
-
-    for (auto expr : _push_down_exprs) {
-        if (_expr_push_down(expr, get_stat_func)) {
+    for (const auto& predicate : _push_down_predicates) {
+        std::function<bool(ParquetPredicate::ColumnStat*, int)> get_stat_func =
+                [&](ParquetPredicate::ColumnStat* stat, const int cid) {
+                    auto* slot = _tuple_descriptor->slots()[cid];
+                    if 
(!_table_info_node_ptr->children_column_exists(slot->col_name())) {
+                        return false;
+                    }
+                    const auto& file_col_name =
+                            
_table_info_node_ptr->children_file_column_name(slot->col_name());
+                    const FieldSchema* col_schema =
+                            _file_metadata->schema().get_column(file_col_name);
+                    int parquet_col_id = col_schema->physical_column_index;
+                    auto meta_data = 
row_group.columns[parquet_col_id].meta_data;
+                    stat->col_schema = col_schema;
+                    return ParquetPredicate::read_column_stats(col_schema, 
meta_data,
+                                                               &_ignored_stats,
+                                                               
_t_metadata->created_by, stat)
+                            .ok();
+                };
+        ParquetPredicate::ColumnStat stat;
+        stat.ctz = _ctz;
+        stat.get_stat_func = &get_stat_func;
+
+        if (!predicate->evaluate_and(&stat)) {
             *filter_group = true;
-            break;
+            return Status::OK();
         }
     }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 3b9a6b04db9..5fdc7d6bf24 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -70,7 +70,7 @@ class VExprContext;
 
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-class ParquetReader : public GenericReader {
+class ParquetReader : public GenericReader, public ExprPushDownHelper {
     ENABLE_FACTORY_CREATOR(ParquetReader);
 
 public:
@@ -233,16 +233,8 @@ private:
 
     Status _set_read_one_line_impl() override { return Status::OK(); }
 
-    bool _expr_push_down(const VExprSPtr& expr,
-                         const std::function<bool(const FieldSchema*,
-                                                  
ParquetPredicate::ColumnStat*)>& get_stat_func);
-    bool _simple_expr_push_down(
-            const VExprSPtr& expr, ParquetPredicate::OP op,
-            const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
-                    get_stat_func);
-    bool _check_expr_can_push_down(const VExprSPtr& expr);
-    bool _check_slot_can_push_down(const VExprSPtr& expr);
-    bool _check_other_children_is_literal(const VExprSPtr& expr);
+    bool _exists_in_file(const VSlotRef* slot) const override;
+    bool _type_matches(const VSlotRef*) const override;
 
     RuntimeProfile* _profile = nullptr;
     const TFileScanRangeParams& _scan_params;
@@ -324,11 +316,12 @@ private:
                                                                                
            -1};
 
     bool _filter_groups;
-    // push down =, >, <, >=, <=, in
-    VExprSPtrs _push_down_exprs;
 
     // for page index filter. slot id => expr
-    std::map<int, VExprSPtrs> _push_down_simple_expr;
+    std::map<int, std::vector<std::unique_ptr<ColumnPredicate>>> 
_push_down_simple_predicates;
+    std::vector<std::unique_ptr<MutilColumnBlockPredicate>> 
_push_down_predicates;
+    std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
+    Arena _arena;
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index cfd556d70e5..e9f5549b510 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -98,27 +98,6 @@ void ParquetOutputStream::set_written_len(int64_t 
written_len) {
     _written_len = written_len;
 }
 
-void ParquetBuildHelper::build_schema_repetition_type(
-        parquet::Repetition::type& parquet_repetition_type,
-        const TParquetRepetitionType::type& column_repetition_type) {
-    switch (column_repetition_type) {
-    case TParquetRepetitionType::REQUIRED: {
-        parquet_repetition_type = parquet::Repetition::REQUIRED;
-        break;
-    }
-    case TParquetRepetitionType::REPEATED: {
-        parquet_repetition_type = parquet::Repetition::REPEATED;
-        break;
-    }
-    case TParquetRepetitionType::OPTIONAL: {
-        parquet_repetition_type = parquet::Repetition::OPTIONAL;
-        break;
-    }
-    default:
-        parquet_repetition_type = parquet::Repetition::UNDEFINED;
-    }
-}
-
 void ParquetBuildHelper::build_compression_type(
         parquet::WriterProperties::Builder& builder,
         const TParquetCompressionType::type& compression_type) {
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index f958b18c4c0..7ae58aff74d 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -71,17 +71,10 @@ private:
 
 class ParquetBuildHelper {
 public:
-    static void build_schema_repetition_type(
-            parquet::Repetition::type& parquet_repetition_type,
-            const TParquetRepetitionType::type& column_repetition_type);
-
-    static void build_schema_data_type(parquet::Type::type& parquet_data_type,
-                                       const TParquetDataType::type& 
column_data_type);
-
-    static void build_compression_type(parquet::WriterProperties::Builder& 
builder,
+    static void build_compression_type(::parquet::WriterProperties::Builder& 
builder,
                                        const TParquetCompressionType::type& 
compression_type);
 
-    static void build_version(parquet::WriterProperties::Builder& builder,
+    static void build_version(::parquet::WriterProperties::Builder& builder,
                               const TParquetVersion::type& parquet_version);
 };
 
@@ -124,9 +117,9 @@ private:
     arrow::Status _open_file_writer();
 
     std::shared_ptr<ParquetOutputStream> _outstream;
-    std::shared_ptr<parquet::WriterProperties> _parquet_writer_properties;
-    std::shared_ptr<parquet::ArrowWriterProperties> _arrow_properties;
-    std::unique_ptr<parquet::arrow::FileWriter> _writer;
+    std::shared_ptr<::parquet::WriterProperties> _parquet_writer_properties;
+    std::shared_ptr<::parquet::ArrowWriterProperties> _arrow_properties;
+    std::unique_ptr<::parquet::arrow::FileWriter> _writer;
     std::shared_ptr<arrow::Schema> _arrow_schema;
 
     std::vector<std::string> _column_names;
diff --git a/be/test/olap/block_column_predicate_test.cpp 
b/be/test/olap/block_column_predicate_test.cpp
index 0a5c726cc6e..936ccfe8ace 100644
--- a/be/test/olap/block_column_predicate_test.cpp
+++ b/be/test/olap/block_column_predicate_test.cpp
@@ -29,14 +29,18 @@
 #include <utility>
 #include <vector>
 
+#include "exprs/hybrid_set.h"
 #include "gtest/gtest_pred_impl.h"
 #include "olap/column_predicate.h"
 #include "olap/comparison_predicate.h"
+#include "olap/in_list_predicate.h"
+#include "olap/null_predicate.h"
 #include "olap/tablet_schema.h"
 #include "runtime/define_primitive_type.h"
 #include "vec/columns/column.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/core/field.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 
 namespace doris {
 
@@ -1200,4 +1204,1006 @@ TEST_F(BlockColumnPredicateTest, 
test_double_single_column_predicate) {
         }
     }
 }
+
+TEST_F(BlockColumnPredicateTest, PARQUET_COMPARISON_PREDICATE) {
+    { // INT
+     {// EQ
+      int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::EQ>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // get stat failed
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            return false;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // NE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::NE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // GE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::GE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+{
+    // LE
+    int value = 5;
+    int col_idx = 0;
+    std::unique_ptr<ColumnPredicate> pred(
+            new ComparisonPredicateBase<TYPE_INT, PredicateType::LE>(col_idx, 
value));
+    SingleColumnBlockPredicate single_column_block_pred(pred.get());
+    std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+            std::make_unique<vectorized::FieldSchema>();
+    parquet_field_col1->name = "col1";
+    parquet_field_col1->data_type =
+            
vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT,
 true);
+    parquet_field_col1->field_id = -1;
+    parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+    vectorized::ParquetPredicate::ColumnStat stat;
+    cctz::time_zone tmp_ctz;
+    stat.ctz = &tmp_ctz;
+
+    std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+    {
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [6, 7]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 6;
+            int upper = 7;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+    }
+    {
+        // 5 not belongs to [1, 4]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            int lower = 1;
+            int upper = 4;
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            stat->encoded_min_value =
+                    std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+            stat->encoded_max_value =
+                    std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+    }
+}
+} // namespace doris
+{
+    // FLOAT
+    {
+        // EQ
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::EQ>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get min max failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = nanf("");
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // NE
+        float value = 5;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::NE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // GE
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::GE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        // LE
+        float value = 5.0;
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_FLOAT, 
PredicateType::LE>(col_idx, value));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_FLOAT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::FLOAT;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+                stat->encoded_min_value = tmp;
+                stat->encoded_max_value = tmp;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [6, 7]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 6.0;
+                float upper = 7.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 not belongs to [1, 4]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                float lower = 1.0;
+                float upper = 4.0;
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                stat->encoded_min_value =
+                        std::string(reinterpret_cast<const char*>(&lower), 
sizeof(lower));
+                stat->encoded_max_value =
+                        std::string(reinterpret_cast<const char*>(&upper), 
sizeof(upper));
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+}
+}
+
+TEST_F(BlockColumnPredicateTest, PARQUET_IN_PREDICATE) {
+    { // INT
+        {
+            int value = 5;
+            int col_idx = 0;
+            auto hybrid_set = 
std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(false);
+            hybrid_set->insert(&value);
+            std::unique_ptr<ColumnPredicate> pred(
+                    new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST,
+                                            
HybridSet<PrimitiveType::TYPE_INT>>(col_idx,
+                                                                               
 hybrid_set));
+            SingleColumnBlockPredicate single_column_block_pred(pred.get());
+            std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                    std::make_unique<vectorized::FieldSchema>();
+            parquet_field_col1->name = "col1";
+            parquet_field_col1->data_type =
+                    vectorized::DataTypeFactory::instance().create_data_type(
+                            PrimitiveType::TYPE_INT, true);
+            parquet_field_col1->field_id = -1;
+            parquet_field_col1->parquet_schema.type = 
tparquet::Type::type::INT32;
+
+            vectorized::ParquetPredicate::ColumnStat stat;
+            cctz::time_zone tmp_ctz;
+            stat.ctz = &tmp_ctz;
+
+            std::function<bool(vectorized::ParquetPredicate::ColumnStat*, 
int)> get_stat_func;
+            {
+                // 5 belongs to [5, 5]
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    stat->col_schema = parquet_field_col1.get();
+                    stat->is_all_null = false;
+                    stat->has_null = false;
+                    auto tmp = std::string(reinterpret_cast<const 
char*>(&value), sizeof(value));
+                    stat->encoded_min_value = tmp;
+                    stat->encoded_max_value = tmp;
+                    return true;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+            {
+                // get stat failed
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    return false;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+        }
+        {
+            int value = 5;
+            int col_idx = 0;
+            auto hybrid_set = 
std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(false);
+            hybrid_set->insert(&value);
+            std::unique_ptr<ColumnPredicate> pred(
+                    new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST,
+                                            
HybridSet<PrimitiveType::TYPE_INT>>(col_idx,
+                                                                               
 hybrid_set));
+            SingleColumnBlockPredicate single_column_block_pred(pred.get());
+            std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                    std::make_unique<vectorized::FieldSchema>();
+            parquet_field_col1->name = "col1";
+            parquet_field_col1->data_type =
+                    vectorized::DataTypeFactory::instance().create_data_type(
+                            PrimitiveType::TYPE_INT, true);
+            parquet_field_col1->field_id = -1;
+            parquet_field_col1->parquet_schema.type = 
tparquet::Type::type::INT32;
+
+            vectorized::ParquetPredicate::ColumnStat stat;
+            cctz::time_zone tmp_ctz;
+            stat.ctz = &tmp_ctz;
+
+            std::function<bool(vectorized::ParquetPredicate::ColumnStat*, 
int)> get_stat_func;
+            {
+                // 5 belongs to [5, 5]
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    stat->col_schema = parquet_field_col1.get();
+                    stat->is_all_null = false;
+                    stat->has_null = false;
+                    int tmp_v = 6;
+                    auto tmp = std::string(reinterpret_cast<const 
char*>(&tmp_v), sizeof(tmp_v));
+                    stat->encoded_min_value = tmp;
+                    stat->encoded_max_value = tmp;
+                    return true;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+            }
+            {
+                // get stat failed
+                get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                    return false;
+                };
+                stat.get_stat_func = &get_stat_func;
+                EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+            }
+        }
+    }
+}
+
+TEST_F(BlockColumnPredicateTest, NULL_PREDICATE) {
+    {
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(new NullPredicate(col_idx, 
true));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+    {
+        int col_idx = 0;
+        std::unique_ptr<ColumnPredicate> pred(new NullPredicate(col_idx, 
false));
+        SingleColumnBlockPredicate single_column_block_pred(pred.get());
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = false;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // 5 belongs to [5, 5]
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                stat->col_schema = parquet_field_col1.get();
+                stat->is_all_null = true;
+                stat->has_null = false;
+                return true;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat));
+        }
+        {
+            // get stat failed
+            get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* 
stat, const int cid) {
+                return false;
+            };
+            stat.get_stat_func = &get_stat_func;
+            EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat));
+        }
+    }
+}
+
+TEST_F(BlockColumnPredicateTest, COMBINED_PREDICATE) {
+    {
+        AndBlockColumnPredicate and_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate));
+        and_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(and_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        AndBlockColumnPredicate and_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate2;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate2 = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_TRUE(true_predicate2->evaluate_and(&stat));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate));
+        and_block_column_pred.add_column_predicate(std::move(true_predicate2));
+        EXPECT_TRUE(and_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> true_predicate;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::EQ>(col_idx, value));
+        true_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_TRUE(true_predicate->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(true_predicate));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_TRUE(or_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate2;
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<ColumnPredicate> pred(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate2 = 
std::make_unique<SingleColumnBlockPredicate>(pred.get());
+
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(false_predicate2->evaluate_and(&stat));
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate2));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(or_block_column_pred.evaluate_and(&stat));
+    }
+    {
+        OrBlockColumnPredicate or_block_column_pred;
+
+        int col_idx = 0;
+        int value = 5;
+        std::unique_ptr<SingleColumnBlockPredicate> false_predicate;
+        std::unique_ptr<ColumnPredicate> pred2(
+                new ComparisonPredicateBase<TYPE_INT, 
PredicateType::NE>(col_idx, value));
+        false_predicate = 
std::make_unique<SingleColumnBlockPredicate>(pred2.get());
+
+        std::unique_ptr<vectorized::FieldSchema> parquet_field_col1 =
+                std::make_unique<vectorized::FieldSchema>();
+        parquet_field_col1->name = "col1";
+        parquet_field_col1->data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                PrimitiveType::TYPE_INT, true);
+        parquet_field_col1->field_id = -1;
+        parquet_field_col1->parquet_schema.type = tparquet::Type::type::INT32;
+
+        vectorized::ParquetPredicate::ColumnStat stat;
+        cctz::time_zone tmp_ctz;
+        stat.ctz = &tmp_ctz;
+
+        std::function<bool(vectorized::ParquetPredicate::ColumnStat*, int)> 
get_stat_func;
+        // 5 belongs to [5, 5]
+        get_stat_func = [&](vectorized::ParquetPredicate::ColumnStat* stat, 
const int cid) {
+            stat->col_schema = parquet_field_col1.get();
+            stat->is_all_null = false;
+            stat->has_null = false;
+            auto tmp = std::string(reinterpret_cast<const char*>(&value), 
sizeof(value));
+            stat->encoded_min_value = tmp;
+            stat->encoded_max_value = tmp;
+            return true;
+        };
+        stat.get_stat_func = &get_stat_func;
+        EXPECT_FALSE(false_predicate->evaluate_and(&stat));
+        or_block_column_pred.add_column_predicate(std::move(false_predicate));
+        EXPECT_FALSE(or_block_column_pred.evaluate_and(&stat));
+    }
+}
+
 } // namespace doris
diff --git a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
index d61773ca7cc..5aa7a4b0b10 100644
--- a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp
@@ -409,7 +409,7 @@ TEST_F(ParquetExprTest, test_ne) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_FALSE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_FALSE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_eq) {
@@ -429,7 +429,7 @@ TEST_F(ParquetExprTest, test_eq) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_le) {
@@ -449,7 +449,7 @@ TEST_F(ParquetExprTest, test_le) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_ge) {
@@ -469,7 +469,7 @@ TEST_F(ParquetExprTest, test_ge) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_gt) {
@@ -489,7 +489,7 @@ TEST_F(ParquetExprTest, test_gt) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_lt) {
@@ -509,7 +509,7 @@ TEST_F(ParquetExprTest, test_lt) {
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 }
 
 TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001   [10000000000 
, 10000000000+3)
@@ -530,7 +530,7 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -545,7 +545,6 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -561,7 +560,6 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 
10000000001   [10000000000 ,
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -583,7 +581,7 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
     auto ctx = VExprContext::create_shared(fn_eq);
     ctx->_prepared = true;
     ctx->_opened = true;
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -594,7 +592,6 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
             stat->has_null = false;
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -606,7 +603,6 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1
             stat->has_null = false;
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -628,7 +624,7 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -643,7 +639,6 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -659,7 +654,6 @@ TEST_F(ParquetExprTest, test_is_null) { // 
int32_all_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -680,7 +674,7 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -695,7 +689,6 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -711,7 +704,6 @@ TEST_F(ParquetExprTest, test_is_not_null) { // 
int32_all_null_col is not null
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -732,7 +724,7 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
     ctx->_prepared = true;
     ctx->_opened = true;
 
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(ctx->root()));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(ctx->root()));
 
     {
         const std::function<bool(const FieldSchema*, 
ParquetPredicate::ColumnStat*)>&
@@ -747,7 +739,6 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 
     {
@@ -763,7 +754,6 @@ TEST_F(ParquetExprTest, test_is_null_2) { // 
int32_partial_null_col is null
             }
             return true;
         };
-        ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
     }
 }
 
@@ -997,10 +987,6 @@ TEST_F(ParquetExprTest, test_in) {
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr2, get_stat_func));
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr3, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr4, get_stat_func));
     }
 
     {
@@ -1016,10 +1002,6 @@ TEST_F(ParquetExprTest, test_in) {
             }
             return true;
         };
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr, get_stat_func));
-        ASSERT_TRUE(p_reader->_expr_push_down(in_expr2, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr3, get_stat_func));
-        ASSERT_FALSE(p_reader->_expr_push_down(in_expr4, get_stat_func));
     }
 }
 
@@ -1052,8 +1034,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_le_int64) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_gt_float) {
@@ -1085,8 +1065,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_gt_float) {
         }
         return true;
     };
-
-    ASSERT_TRUE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_ge_double) {
@@ -1118,8 +1096,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_ge_double) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_lt_string) {
@@ -1151,8 +1127,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_lt_string) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_eq_bool) {
@@ -1183,8 +1157,6 @@ TEST_F(ParquetExprTest, test_expr_push_down_eq_bool) {
         }
         return true;
     };
-
-    ASSERT_FALSE(p_reader->_expr_push_down(ctx->root(), get_stat_func));
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_and) {
@@ -1263,12 +1235,16 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
         }
         return true;
     };
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(and_expr));
-    ASSERT_FALSE(p_reader->_expr_push_down(and_expr, get_stat_func));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(and_expr));
 
     p_reader->_enable_filter_by_min_max = true;
-    p_reader->_push_down_exprs.clear();
-    p_reader->_push_down_exprs.push_back(and_expr);
+    p_reader->_push_down_simple_predicates.clear();
+    p_reader->_push_down_simple_predicates.emplace(
+            2, std::vector<std::unique_ptr<ColumnPredicate>> {});
+    p_reader->_push_down_predicates.emplace(2, 
AndBlockColumnPredicate::create_unique());
+    ASSERT_TRUE(p_reader->convert_predicates({and_expr}, 
p_reader->_push_down_simple_predicates[2],
+                                             
p_reader->_push_down_predicates[2], p_reader->_arena)
+                        .ok());
 
     bool filter_group = false;
     
ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[0], 
&filter_group)
@@ -1278,7 +1254,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
     
ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[1], 
&filter_group)
                         .OK());
     ASSERT_TRUE(filter_group);
-    p_reader->_push_down_exprs.clear();
+    p_reader->_push_down_simple_predicates.clear();
 }
 
 TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
@@ -1335,9 +1311,8 @@ TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
         }
         return true;
     };
-    ASSERT_TRUE(p_reader->_check_expr_can_push_down(or_expr));
-    ASSERT_FALSE(p_reader->_expr_push_down(or_expr, get_stat_func));
+    ASSERT_TRUE(p_reader->check_expr_can_push_down(or_expr));
 }
 
 } // namespace vectorized
-} // namespace doris
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to