This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f551462  feat(parquet): introduce stats extraction and predicate 
pushdown (#53)
f551462 is described below

commit f5514628dceb195479110cb6bed14b8ae8bfc09f
Author: Zhang Jiawei <[email protected]>
AuthorDate: Mon Jun 8 11:58:49 2026 +0800

    feat(parquet): introduce stats extraction and predicate pushdown (#53)
---
 .../format/parquet/parquet_stats_extractor.cpp     | 340 +++++++++
 .../format/parquet/parquet_stats_extractor.h       |  82 +++
 .../parquet/parquet_stats_extractor_test.cpp       | 351 +++++++++
 src/paimon/format/parquet/predicate_converter.cpp  | 300 ++++++++
 src/paimon/format/parquet/predicate_converter.h    |  74 ++
 .../format/parquet/predicate_converter_test.cpp    | 396 ++++++++++
 .../format/parquet/predicate_pushdown_test.cpp     | 813 +++++++++++++++++++++
 7 files changed, 2356 insertions(+)

diff --git a/src/paimon/format/parquet/parquet_stats_extractor.cpp 
b/src/paimon/format/parquet/parquet_stats_extractor.cpp
new file mode 100644
index 0000000..8f8b82a
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor.cpp
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+
+#include <cassert>
+#include <optional>
+#include <string_view>
+#include <unordered_map>
+
+#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/parquet/parquet_schema_util.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+
+namespace paimon {
+class MemoryPool;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+namespace {
+
+template <typename ParquetTypedStatsType, typename R = typename 
ParquetTypedStatsType::T>
+std::pair<std::optional<R>, std::optional<R>> CollectMinMaxStats(
+    const std::shared_ptr<ParquetTypedStatsType>& typed_stats) {
+    std::optional<R> min;
+    std::optional<R> max;
+    if (typed_stats && typed_stats->HasMinMax()) {
+        min = typed_stats->min();
+        max = typed_stats->max();
+    }
+    return std::make_pair(min, max);
+}
+
+Result<std::unique_ptr<ColumnStats>> ConvertStatsToColumnStats(
+    const std::shared_ptr<::parquet::Statistics>& stats,
+    const std::shared_ptr<::parquet::schema::PrimitiveNode>& primitive_node,
+    const std::shared_ptr<arrow::DataType>& write_type, const 
std::shared_ptr<MemoryPool>& pool) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::DataType> 
data_type,
+                                      GetArrowType(*primitive_node));
+    auto id = data_type->id();
+
+    std::optional<int64_t> null_count;
+    if (stats && stats->HasNullCount()) {
+        null_count = stats->null_count();
+    }
+    switch (id) {
+        case arrow::Type::BOOL: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::BoolStatistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            return ColumnStats::CreateBooleanColumnStats(min, max, null_count);
+        }
+        case arrow::Type::INT8: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+            std::optional<int8_t> min;
+            std::optional<int8_t> max;
+            if (typed_stats && typed_stats->HasMinMax()) {
+                min = static_cast<int8_t>(typed_stats->min());
+                max = static_cast<int8_t>(typed_stats->max());
+            }
+            return ColumnStats::CreateTinyIntColumnStats(min, max, null_count);
+        }
+        case arrow::Type::INT16: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+            std::optional<int16_t> min;
+            std::optional<int16_t> max;
+            if (typed_stats && typed_stats->HasMinMax()) {
+                min = static_cast<int16_t>(typed_stats->min());
+                max = static_cast<int16_t>(typed_stats->max());
+            }
+            return ColumnStats::CreateSmallIntColumnStats(min, max, 
null_count);
+        }
+        case arrow::Type::INT32: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            return ColumnStats::CreateIntColumnStats(min, max, null_count);
+        }
+        case arrow::Type::INT64: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            return ColumnStats::CreateBigIntColumnStats(min, max, null_count);
+        }
+        case arrow::Type::FLOAT: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::FloatStatistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            return ColumnStats::CreateFloatColumnStats(min, max, null_count);
+        }
+        case arrow::Type::DOUBLE: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::DoubleStatistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            return ColumnStats::CreateDoubleColumnStats(min, max, null_count);
+        }
+        case arrow::Type::STRING: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::ByteArrayStatistics>(stats);
+            std::optional<std::string> min;
+            std::optional<std::string> max;
+            if (typed_stats && typed_stats->HasMinMax()) {
+                min = std::string(std::string_view{typed_stats->min()});
+                max = std::string(std::string_view{typed_stats->max()});
+            }
+            return ColumnStats::CreateStringColumnStats(min, max, null_count);
+        }
+        case arrow::Type::BINARY: {
+            return ColumnStats::CreateStringColumnStats(std::nullopt, 
std::nullopt, null_count);
+        }
+        case arrow::Type::DATE32: {
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+            auto [min, max] = 
CollectMinMaxStats<::parquet::Int32Statistics>(typed_stats);
+            return ColumnStats::CreateDateColumnStats(min, max, null_count);
+        }
+        case arrow::Type::TIMESTAMP: {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<::arrow::TimestampType>(data_type);
+            if (timestamp_type->unit() == arrow::TimeUnit::type::NANO) {
+                // int96 does not have statistics
+                return ColumnStats::CreateTimestampColumnStats(
+                    std::nullopt, std::nullopt, std::nullopt, 
Timestamp::MAX_PRECISION);
+            }
+            auto typed_stats =
+                
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+            auto [min, max] = CollectMinMaxStats(typed_stats);
+            // while write type is ts(second), data type in parquet file will 
be ts(milli), correct
+            // precision is supposed to be extracted from write type
+            auto write_ts_type =
+                
arrow::internal::checked_pointer_cast<::arrow::TimestampType>(write_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(write_ts_type);
+            if (!min || !max) {
+                return ColumnStats::CreateTimestampColumnStats(std::nullopt, 
std::nullopt,
+                                                               null_count, 
precision);
+            }
+            auto src_time_type = 
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+            auto [milli_min, nano_min] = DateTimeUtils::TimestampConverter(
+                min.value(), src_time_type, 
DateTimeUtils::TimeType::MILLISECOND,
+                DateTimeUtils::TimeType::NANOSECOND);
+            auto [milli_max, nano_max] = DateTimeUtils::TimestampConverter(
+                max.value(), src_time_type, 
DateTimeUtils::TimeType::MILLISECOND,
+                DateTimeUtils::TimeType::NANOSECOND);
+            return 
ColumnStats::CreateTimestampColumnStats(Timestamp(milli_min, nano_min),
+                                                           
Timestamp(milli_max, nano_max),
+                                                           null_count, 
precision);
+        }
+        case arrow::Type::DECIMAL128: {
+            auto decimal_type =
+                
arrow::internal::checked_pointer_cast<::arrow::Decimal128Type>(data_type);
+            int32_t precision = decimal_type->precision();
+            int32_t scale = decimal_type->scale();
+            std::optional<Decimal> min_value;
+            std::optional<Decimal> max_value;
+            if (primitive_node->physical_type() == ::parquet::Type::INT32) {
+                auto typed_stats =
+                    
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+                if (typed_stats && typed_stats->HasMinMax()) {
+                    min_value = Decimal(precision, scale, typed_stats->min());
+                    max_value = Decimal(precision, scale, typed_stats->max());
+                }
+            } else if (primitive_node->physical_type() == 
::parquet::Type::INT64) {
+                auto typed_stats =
+                    
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+                if (typed_stats && typed_stats->HasMinMax()) {
+                    min_value = Decimal(precision, scale, typed_stats->min());
+                    max_value = Decimal(precision, scale, typed_stats->max());
+                }
+            } else if (primitive_node->physical_type() == 
::parquet::Type::FIXED_LEN_BYTE_ARRAY ||
+                       primitive_node->physical_type() == 
::parquet::Type::BYTE_ARRAY) {
+                if (stats && stats->HasMinMax()) {
+                    Bytes encode_min(stats->EncodeMin(), pool.get());
+                    min_value = Decimal::FromUnscaledBytes(precision, scale, 
&encode_min);
+                    Bytes encode_max(stats->EncodeMax(), pool.get());
+                    max_value = Decimal::FromUnscaledBytes(precision, scale, 
&encode_max);
+                }
+            }
+            return ColumnStats::CreateDecimalColumnStats(min_value, max_value, 
null_count,
+                                                         precision, scale);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("cannot fetch statistics, invalid type {}", 
data_type->ToString()));
+    }
+}
+
+template <typename T>
+void MergeTypedStats(
+    const std::string& column_name, const 
std::shared_ptr<::parquet::Statistics>& stats,
+    std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>>* 
merged_stats) {
+    auto& entry = (*merged_stats)[column_name];
+    if (!entry) {
+        entry = stats;
+    } else {
+        arrow::internal::checked_pointer_cast<T>(entry)->Merge(
+            *arrow::internal::checked_pointer_cast<T>(stats));
+    }
+}
+
+Status MergeStats(
+    const std::string& column_name, const 
std::shared_ptr<::parquet::Statistics>& stats,
+    std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>>* 
merged_stats) {
+    switch (stats->physical_type()) {
+        case ::parquet::Type::BOOLEAN:
+            MergeTypedStats<::parquet::BoolStatistics>(column_name, stats, 
merged_stats);
+            break;
+        case ::parquet::Type::INT32:
+            MergeTypedStats<::parquet::Int32Statistics>(column_name, stats, 
merged_stats);
+            break;
+        case ::parquet::Type::INT64:
+            MergeTypedStats<::parquet::Int64Statistics>(column_name, stats, 
merged_stats);
+            break;
+        case ::parquet::Type::FLOAT:
+            MergeTypedStats<::parquet::FloatStatistics>(column_name, stats, 
merged_stats);
+            break;
+        case ::parquet::Type::DOUBLE:
+            MergeTypedStats<::parquet::DoubleStatistics>(column_name, stats, 
merged_stats);
+            break;
+        case ::parquet::Type::BYTE_ARRAY:
+            MergeTypedStats<::parquet::ByteArrayStatistics>(column_name, 
stats, merged_stats);
+            break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
+            MergeTypedStats<::parquet::FLBAStatistics>(column_name, stats, 
merged_stats);
+            break;
+        default:
+            return Status::Invalid(fmt::format("Unsupported parquet type {} 
for statistics merge",
+                                               
::parquet::TypeToString(stats->physical_type())));
+    }
+    return Status::OK();
+}
+
+}  // namespace
+
+Result<std::pair<ColumnStatsVector, FormatStatsExtractor::FileInfo>>
+ParquetStatsExtractor::ExtractWithFileInfo(const std::shared_ptr<FileSystem>& 
file_system,
+                                           const std::string& path,
+                                           const std::shared_ptr<MemoryPool>& 
pool) {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream, 
file_system->Open(path));
+    assert(input_stream);
+    PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length());
+    std::shared_ptr<arrow::MemoryPool> parquet_memory_pool = 
GetArrowPool(pool);
+    auto parquet_input_file = std::make_shared<ArrowInputStreamAdapter>(
+        std::move(input_stream), parquet_memory_pool, file_length);
+    ::parquet::ReaderProperties read_properties(parquet_memory_pool.get());
+    read_properties.enable_buffered_stream();
+    ::parquet::arrow::FileReaderBuilder file_reader_builder;
+    
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(parquet_input_file, 
read_properties));
+
+    std::shared_ptr<::parquet::FileMetaData> file_metadata =
+        file_reader_builder.raw_reader()->metadata();
+    int32_t field_count = file_metadata->schema()->group_node()->field_count();
+
+    ColumnStatsVector result_stats;
+    result_stats.reserve(field_count);
+
+    std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>> 
merged_stats;
+
+    for (int32_t row_group_idx = 0; row_group_idx < 
file_metadata->num_row_groups();
+         ++row_group_idx) {
+        for (int32_t col_idx = 0; col_idx < file_metadata->num_columns(); 
++col_idx) {
+            auto column_chunk = 
file_metadata->RowGroup(row_group_idx)->ColumnChunk(col_idx);
+            if (!column_chunk->is_stats_set()) {
+                continue;
+            }
+            auto stats = column_chunk->statistics();
+            std::string column_name = 
column_chunk->path_in_schema()->ToDotString();
+            PAIMON_RETURN_NOT_OK(MergeStats(column_name, stats, 
&merged_stats));
+        }
+    }
+
+    for (int32_t field_idx = 0; field_idx < field_count; ++field_idx) {
+        auto node = file_metadata->schema()->group_node()->field(field_idx);
+        if (node->is_group()) {
+            // nested type do not have parquet stats
+            const auto& logical_type = node->logical_type();
+            FieldType nested_type = FieldType::UNKNOWN;
+            if (logical_type->is_list()) {
+                nested_type = FieldType::ARRAY;
+            } else if (logical_type->is_map()) {
+                nested_type = FieldType::MAP;
+            } else if (logical_type->is_none()) {
+                nested_type = FieldType::STRUCT;
+            }
+            
result_stats.push_back(ColumnStats::CreateNestedColumnStats(nested_type, 
std::nullopt));
+        } else {
+            auto primitive_node =
+                
arrow::internal::checked_pointer_cast<::parquet::schema::PrimitiveNode>(node);
+            assert(primitive_node != nullptr);
+            auto iter = merged_stats.find(node->name());
+            const std::shared_ptr<::parquet::Statistics>& parquet_stats =
+                iter == merged_stats.end() ? nullptr : iter->second;
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<ColumnStats> col_stats,
+                ConvertStatsToColumnStats(parquet_stats, primitive_node,
+                                          
write_schema_->field(field_idx)->type(), pool));
+            result_stats.push_back(col_stats);
+        }
+    }
+    return std::make_pair(std::move(result_stats), 
FileInfo(file_metadata->num_rows()));
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_stats_extractor.h 
b/src/paimon/format/parquet/parquet_stats_extractor.h
new file mode 100644
index 0000000..0fc852b
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/format_stats_extractor.h"
+#include "paimon/format/parquet/parquet_schema_util.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "parquet/metadata.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class FileSystem;
+class MemoryPool;
+}  // namespace paimon
+
+namespace parquet::schema {
+class GroupNode;
+class Node;
+class PrimitiveNode;
+}  // namespace parquet::schema
+
+namespace paimon::parquet {
+
+class ParquetStatsExtractor : public FormatStatsExtractor {
+ public:
+    explicit ParquetStatsExtractor(const std::shared_ptr<arrow::Schema>& 
write_schema)
+        : write_schema_(write_schema) {}
+
+    Result<ColumnStatsVector> Extract(const std::shared_ptr<FileSystem>& 
file_system,
+                                      const std::string& path,
+                                      const std::shared_ptr<MemoryPool>& pool) 
override {
+        PAIMON_ASSIGN_OR_RAISE(auto result, ExtractWithFileInfo(file_system, 
path, pool));
+        return result.first;
+    }
+
+    Result<std::pair<ColumnStatsVector, FileInfo>> ExtractWithFileInfo(
+        const std::shared_ptr<FileSystem>& file_system, const std::string& 
path,
+        const std::shared_ptr<MemoryPool>& pool) override;
+
+ private:
+    void PrintConvertedType(const ::parquet::schema::Node* node);
+
+    void Visit(const ::parquet::schema::Node* node);
+    void Visit(const ::parquet::schema::PrimitiveNode* node);
+    void Visit(const ::parquet::schema::GroupNode* node);
+
+ private:
+    int64_t row_count_ = -1;
+    std::shared_ptr<arrow::Schema> write_schema_;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_stats_extractor_test.cpp 
b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp
new file mode 100644
index 0000000..4dc7fbe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+
+#include <cstddef>
+#include <map>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/compare.h"
+#include "arrow/io/file.h"
+#include "arrow/ipc/api.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/core/stats/simple_stats_converter.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace paimon::parquet::test {
+
+class ParquetStatsExtractorTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+    }
+    void TearDown() override {}
+
+    void CheckStats(const arrow::FieldVector& fields, const std::string& input,
+                    const std::vector<std::string>& expected_stats, int64_t 
expect_row_count) {
+        auto arrow_schema = arrow::schema(fields);
+        auto struct_type = arrow::struct_(fields);
+        std::map<std::string, std::string> options;
+        std::shared_ptr<arrow::MemoryPool> pool = 
GetArrowPool(GetDefaultPool());
+        std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+        std::string file_name;
+        ASSERT_TRUE(UUID::Generate(&file_name));
+        std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs->Create(file_path, /*overwrite=*/false));
+        ::parquet::WriterProperties::Builder builder;
+        builder.enable_store_decimal_as_integer();
+        ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create(
+                                                     out, arrow_schema, 
builder.build(),
+                                                     
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool));
+        auto array = arrow::ipc::internal::json::ArrayFromJSON(struct_type, 
input).ValueOrDie();
+        auto arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+        ASSERT_OK(format_writer->AddBatch(arrow_array.get()));
+        ASSERT_OK(format_writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+        auto write_schema = std::make_shared<arrow::Schema>(fields);
+        ParquetStatsExtractor stats_extractor(write_schema);
+        ASSERT_OK_AND_ASSIGN(auto result,
+                             stats_extractor.ExtractWithFileInfo(fs, 
file_path, GetDefaultPool()));
+        auto& col_stats_vec = result.first;
+        ASSERT_EQ(fields.size(), col_stats_vec.size());
+        ASSERT_EQ(col_stats_vec.size(), expected_stats.size());
+        for (size_t i = 0; i < expected_stats.size(); i++) {
+            ASSERT_EQ(expected_stats[i], col_stats_vec[i]->ToString());
+        }
+        auto row_count = result.second.GetRowCount();
+        ASSERT_EQ(row_count, expect_row_count);
+    }
+
+ private:
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+};
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStats) {
+    arrow::FieldVector fields = {
+        arrow::field("col0", arrow::struct_({arrow::field("col2", 
arrow::boolean()),
+                                             arrow::field("col3", 
arrow::int64())})),
+        arrow::field("col1", arrow::utf8()),
+        arrow::field("col2", arrow::int32()),
+        arrow::field("col3", arrow::boolean()),
+        arrow::field("col4", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("col5", arrow::decimal128(23, 2)),
+        arrow::field("col6", arrow::float32()),
+        arrow::field("col7", arrow::float64()),
+    };
+    {
+        std::string data_str = R"([
+            [[true, 0], "str0", 100, true, "3970-01-01 00:00:00.000", "0.22", 
1.1, 12.2],
+            [[false, 1], "str1", 101, false, "3970-01-01 00:02:03.999", 
"0.28", 2.2, 12.2],
+            [[false, 2], "str2", 102, true, "3455-01-01 00:02:03.000", 
"1234567890123456.00", 3.3, 13.2]
+        ])";
+        std::vector<std::string> expected_stats_str = {
+            "min null, max null, null count null",
+            "min str0, max str2, null count 0",
+            "min 100, max 102, null count 0",
+            "min false, max true, null count 0",
+            "min null, max null, null count null",
+            "min 0.22, max 1234567890123456.00, null count 0",
+            "min 1.1, max 3.3, null count 0",
+            "min 12.2, max 13.2, null count 0",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/3);
+    }
+    {
+        std::string data_str = R"([
+            [[true, 0], "str0", 100, true, "3970-01-01 00:00:00.000", "0.22", 
1.1, 12.2],
+            [[false, 1], "str1", 101, true, "3970-01-01 00:02:03.999", "0.28", 
2.2, 12.2],
+            [[false, 2], "str2", 102, true, "3455-01-01 00:02:03.000", 
"1234567890123456.00", 3.3, 13.2]
+        ])";
+        std::vector<std::string> expected_stats_str = {
+            "min null, max null, null count null",
+            "min str0, max str2, null count 0",
+            "min 100, max 102, null count 0",
+            "min true, max true, null count 0",
+            "min null, max null, null count null",
+            "min 0.22, max 1234567890123456.00, null count 0",
+            "min 1.1, max 3.3, null count 0",
+            "min 12.2, max 13.2, null count 0",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/3);
+    }
+
+    {
+        std::string data_str = R"([
+            [[true, 0], "str0", 100, true, "1970-01-01 00:00:00.000", "0.22", 
1.1, 12.2],
+            [[false, 1], "str1", 101, false, "1970-01-01 00:02:03.999", 
"0.28", 2.2, 12.2],
+            [[false, 2], "str2", 102, true, "1985-01-01 00:02:03.000", "1.00", 
3.3, 13.2]
+        ])";
+        std::vector<std::string> expected_stats_str = {
+            "min null, max null, null count null", "min str0, max str2, null 
count 0",
+            "min 100, max 102, null count 0",      "min false, max true, null 
count 0",
+            "min null, max null, null count null", "min 0.22, max 1.00, null 
count 0",
+            "min 1.1, max 3.3, null count 0",      "min 12.2, max 13.2, null 
count 0",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/3);
+    }
+    {
+        std::string data_str = R"([
+            [[true, 0], null, 100, null, "3970-01-01 00:00:00.000", null, 
null, null],
+            [[false, 1], "str1", 101, null, "3970-01-01 00:02:03.999", null, 
null, 2.2],
+            [null, "str2", null, true, "3455-01-01 00:02:03.000", null, 1.1, 
3.3]
+        ])";
+        std::vector<std::string> expected_stats_str = {
+            "min null, max null, null count null", "min str1, max str2, null 
count 1",
+            "min 100, max 101, null count 1",      "min true, max true, null 
count 2",
+            "min null, max null, null count null", "min null, max null, null 
count 3",
+            "min 1.1, max 1.1, null count 2",      "min 2.2, max 3.3, null 
count 1",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/3);
+    }
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsSimpleType) {
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::boolean()),       arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int16()),         arrow::field("f3", 
arrow::int32()),
+        arrow::field("field_null", arrow::int32()), arrow::field("f4", 
arrow::int64()),
+        arrow::field("f5", arrow::float32()),       arrow::field("f6", 
arrow::float64()),
+        arrow::field("f7", arrow::utf8()),          arrow::field("f8", 
arrow::binary())};
+
+    std::string data_str = R"([
+        [true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, 
"20250327", "banana"],
+        [false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, 
"20250327", "dog"],
+        [null, 1, 32767, 2147483647, null, null, 2.1, 3.141592657, null, 
"lucy"],
+        [true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, 
"20250326", null]
+    ])";
+    std::vector<std::string> expected_stats_str = {
+        "min false, max true, null count 1",
+        "min -2, max 1, null count 0",
+        "min -32768, max 32767, null count 0",
+        "min -2147483648, max 2147483647, null count 1",
+        "min null, max null, null count 4",
+        "min -4294967298, max 4294967296, null count 1",
+        "min 0.5, max 2.1, null count 0",
+        "min 1.141592659, max 3.141592657, null count 0",
+        "min 20250326, max 20250327, null count 1",
+        "min null, max null, null count 1",
+    };
+    CheckStats(fields, data_str, expected_stats_str, /*expect_row_count=*/4);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsComplexType) {
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())),
+        arrow::field("f2", arrow::list(arrow::float32())),
+        arrow::field("f3", arrow::struct_({arrow::field("f0", 
arrow::boolean()),
+                                           arrow::field("f1", 
arrow::int64())})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f5", arrow::date32()),
+        arrow::field("f6", arrow::decimal128(2, 2))};
+
+    std::string data_str = R"([
+        [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456, 
"0.22"],
+        [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999999", 24, 
"0.28"],
+        [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123123", 
2456, "0.22"],
+        [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], 
"1970-01-01 00:02:03.123123", 245, "0.12"],
+        [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 
24, "0.78"],
+        [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 
00:00:00.123123", 24, "0.78"]
+    ])";
+    std::vector<std::string> expected_stats_str = {
+        "min null, max null, null count null", "min null, max null, null count 
null",
+        "min null, max null, null count null", "min null, max null, null count 
null",
+        "min 24, max 2456, null count 0",      "min 0.12, max 0.78, null count 
0",
+    };
+    CheckStats(fields, data_str, expected_stats_str, /*expect_row_count=*/6);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestNullForAllType) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::boolean()),
+        arrow::field("f1", arrow::int8()),
+        arrow::field("f2", arrow::int16()),
+        arrow::field("f3", arrow::int32()),
+        arrow::field("f4", arrow::int64()),
+        arrow::field("f5", arrow::float32()),
+        arrow::field("f6", arrow::float64()),
+        arrow::field("f7", arrow::utf8()),
+        arrow::field("f8", arrow::binary()),
+        arrow::field("f9", arrow::map(arrow::int8(), arrow::int16())),
+        arrow::field("f10", arrow::list(arrow::float32())),
+        arrow::field("f11", arrow::struct_({arrow::field("f0", 
arrow::boolean()),
+                                            arrow::field("f1", 
arrow::int64())})),
+        arrow::field("f12", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f13", arrow::date32()),
+        arrow::field("f14", arrow::decimal128(2, 2)),
+        arrow::field("f15", arrow::decimal128(30, 2)),
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone)),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone)),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)),
+    };
+    auto schema = std::make_shared<arrow::Schema>(fields);
+    std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+    std::string file_name = dir_->Str() + "/test.parquet";
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs->Create(file_name, /*overwrite=*/false));
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::MemoryPool> arrow_pool = GetArrowPool(pool);
+    ::parquet::WriterProperties::Builder builder;
+    builder.enable_store_decimal_as_integer();
+    ASSERT_OK_AND_ASSIGN(
+        auto format_writer,
+        ParquetFormatWriter::Create(out, schema, builder.build(),
+                                    DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool));
+    auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        
[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]
+    ])")
+            .ValueOrDie());
+    ArrowArray c_array;
+    ASSERT_TRUE(arrow::ExportArray(*src_array, &c_array).ok());
+    ASSERT_OK(format_writer->AddBatch(&c_array));
+    ASSERT_OK(format_writer->Flush());
+    ASSERT_OK(format_writer->Finish());
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+
+    auto extractor = std::make_shared<ParquetStatsExtractor>(schema);
+    ASSERT_OK_AND_ASSIGN(auto ret, extractor->ExtractWithFileInfo(fs, 
file_name, pool));
+
+    auto column_stats = ret.first;
+    auto file_info = ret.second;
+    ASSERT_EQ(src_array->length(), file_info.GetRowCount());
+    ASSERT_OK_AND_ASSIGN(auto stats, 
SimpleStatsConverter::ToBinary(column_stats, pool.get()));
+    // test compatible with java
+    ASSERT_EQ(stats.min_values_.HashCode(), 0xf890741a);
+    ASSERT_EQ(stats.max_values_.HashCode(), 0xf890741a);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsTimestampType) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone)),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone)),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)),
+    };
+
+    {
+        std::string data_str = R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02", 
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01 
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null,                       
  "1970-01-01 00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 
00:00:00.004", "1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null,                       
  null,                            "1970-01-01 00:00:06",                     
null,  "1970-01-01 00:00:00.000006", null]
+    ])";
+        std::vector<std::string> expected_stats_str = {
+            "min 1970-01-01 00:00:01.000000000, max 1970-01-01 
00:00:05.000000000, null count 0",
+            "min 1970-01-01 00:00:00.001000000, max 1970-01-01 
00:00:00.005000000, null count 0",
+            "min 1970-01-01 00:00:00.000001000, max 1970-01-01 
00:00:00.000001000, null count 2",
+            "min null, max null, null count null",
+            "min 1970-01-01 00:00:02.000000000, max 1970-01-01 
00:00:06.000000000, null count 0",
+            "min 1970-01-01 00:00:00.002000000, max 1970-01-01 
00:00:00.004000000, null count 1",
+            "min 1970-01-01 00:00:00.000002000, max 1970-01-01 
00:00:00.000006000, null count 0",
+            "min null, max null, null count null",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/3);
+    }
+    {
+        std::string data_str = R"([
+         [null,null,null,null,null,null,null,null]
+    ])";
+        std::vector<std::string> expected_stats_str = {
+            "min null, max null, null count 1", "min null, max null, null 
count 1",
+            "min null, max null, null count 1", "min null, max null, null 
count null",
+            "min null, max null, null count 1", "min null, max null, null 
count 1",
+            "min null, max null, null count 1", "min null, max null, null 
count null",
+        };
+        CheckStats(fields, data_str, expected_stats_str, 
/*expect_row_count=*/1);
+    }
+}
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/predicate_converter.cpp 
b/src/paimon/format/parquet/predicate_converter.cpp
new file mode 100644
index 0000000..819d2cc
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter.cpp
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/expression.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon::parquet {
+arrow::compute::Expression PredicateConverter::AlwaysTrue() {
+    static const arrow::compute::Expression expr = 
arrow::compute::literal(true);
+    return expr;
+}
+
+Result<arrow::compute::Expression> PredicateConverter::Convert(
+    const std::shared_ptr<Predicate>& predicate, uint32_t node_count_limit) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    uint32_t node_count = 0;
+    CollectNodeCount(predicate, &node_count);
+    if (node_count > node_count_limit) {
+        return AlwaysTrue();
+    }
+    return InnerConvert(predicate);
+}
+
+void PredicateConverter::CollectNodeCount(const std::shared_ptr<Predicate>& 
predicate,
+                                          uint32_t* node_count) {
+    const auto& function_type = predicate->GetFunction().GetType();
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        if (function_type == Function::Type::IN || function_type == 
Function::Type::NOT_IN) {
+            // IN and NOT_IN will be converted to Or(Equals) and And(NotEqual)
+            *node_count += leaf_predicate->Literals().size();
+        }
+        *node_count += 1;
+        return;
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        *node_count += 1;
+        for (const auto& child : compound_predicate->Children()) {
+            CollectNodeCount(child, node_count);
+        }
+    }
+}
+
+Result<arrow::compute::Expression> PredicateConverter::InnerConvert(
+    const std::shared_ptr<Predicate>& predicate) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        return ConvertLeaf(leaf_predicate);
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        return ConvertCompound(compound_predicate);
+    }
+    return Status::Invalid("invalid predicate, must be leaf or compound");
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertCompound(
+    const std::shared_ptr<CompoundPredicate>& compound_predicate) {
+    const auto& children = compound_predicate->Children();
+    const auto& function = compound_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::AND: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::OR: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("invalid predicate type {}", 
static_cast<int32_t>(function_type)));
+    }
+}
+
+Status PredicateConverter::CheckLiteralNotEmpty(const std::vector<Literal>& 
literals,
+                                                const Function& function,
+                                                const std::string& field_name) 
{
+    if (literals.empty()) {
+        return Status::Invalid(fmt::format("predicate [{}] need literal on 
field {}",
+                                           function.ToString(), field_name));
+    }
+    return Status::OK();
+}
+
+#define CONVERT_TO_ARROW_LITERAL(LITERAL)                                      
           \
+    auto arrow_literal_result = ConvertToArrowLiteral(LITERAL);                
           \
+    if (!arrow_literal_result.ok() && 
arrow_literal_result.status().IsNotImplemented()) { \
+        return AlwaysTrue();                                                   
           \
+    }                                                                          
           \
+    if (!arrow_literal_result.ok()) {                                          
           \
+        return arrow_literal_result.status();                                  
           \
+    }                                                                          
           \
+    auto arrow_literal = std::move(arrow_literal_result).value();
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertLeaf(
+    const std::shared_ptr<LeafPredicate>& leaf_predicate) {
+    const auto& field_name = leaf_predicate->FieldName();
+    const auto& literals = leaf_predicate->Literals();
+    const auto& function = leaf_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::IS_NULL: {
+            return 
arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                           /*nan_is_null=*/false);
+        }
+        case Function::Type::IS_NOT_NULL: {
+            return arrow::compute::not_(
+                arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                        /*nan_is_null=*/false));
+        }
+        case Function::Type::EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::NOT_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::not_equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater_equal(arrow::compute::field_ref(field_name),
+                                                 arrow_literal);
+        }
+        case Function::Type::LESS_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return arrow::compute::less(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        case Function::Type::LESS_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::less_equal(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        // Noted that: java paimon don't support pushdown IN and NOT_IN to 
parquet
+        case Function::Type::IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // in convert to Or(Equals)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                sub_exprs.push_back(
+                    
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        case Function::Type::NOT_IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // not in convert to And(NotEqual)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                
sub_exprs.push_back(arrow::compute::not_equal(arrow::compute::field_ref(field_name),
+                                                              arrow_literal));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::STARTS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("starts_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::ENDS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("ends_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::CONTAINS: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_substring", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::LIKE: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_like", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("invalid predicate type {}", 
static_cast<int32_t>(function_type)));
+    }
+    return Status::OK();
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertToArrowLiteral(
+    const Literal& literal) {
+    auto literal_type = literal.GetType();
+    if (literal.IsNull()) {
+        return Status::Invalid("literal cannot be null in predicate");
+    }
+    switch (literal_type) {
+        case FieldType::BOOLEAN:
+            return 
arrow::compute::literal(std::make_shared<arrow::BooleanScalar>(
+                static_cast<bool>(literal.GetValue<bool>())));
+        case FieldType::TINYINT:
+            return arrow::compute::literal(std::make_shared<arrow::Int8Scalar>(
+                static_cast<int8_t>(literal.GetValue<int8_t>())));
+        case FieldType::SMALLINT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int16Scalar>(
+                static_cast<int16_t>(literal.GetValue<int16_t>())));
+        case FieldType::INT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int32Scalar>(
+                static_cast<int32_t>(literal.GetValue<int32_t>())));
+        case FieldType::DATE:
+            return 
arrow::compute::literal(std::make_shared<arrow::Date32Scalar>(
+                static_cast<int32_t>(literal.GetValue<int32_t>())));
+        case FieldType::BIGINT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int64Scalar>(
+                static_cast<int64_t>(literal.GetValue<int64_t>())));
+        case FieldType::FLOAT:
+            return 
arrow::compute::literal(std::make_shared<arrow::FloatScalar>(
+                static_cast<float>(literal.GetValue<float>())));
+        case FieldType::DOUBLE:
+            return 
arrow::compute::literal(std::make_shared<arrow::DoubleScalar>(
+                static_cast<double>(literal.GetValue<double>())));
+        case FieldType::STRING: {
+            auto str = literal.GetValue<std::string>();
+            return 
arrow::compute::literal(std::make_shared<arrow::StringScalar>(str));
+        }
+        case FieldType::DECIMAL: {
+            auto decimal = literal.GetValue<Decimal>();
+            return 
arrow::compute::literal(std::make_shared<arrow::Decimal128Scalar>(
+                arrow::Decimal128(decimal.HighBits(), decimal.LowBits()),
+                arrow::decimal128(decimal.Precision(), decimal.Scale())));
+        }
+        // TODO(lisizhuo.lsz): java paimon does not support BINARY, TIMESTAMP 
and DECIMAL
+        case FieldType::TIMESTAMP:
+        case FieldType::BINARY:
+            return Status::NotImplemented(
+                "Not support Binary and Timestamp predicate push down in 
parquet file "
+                "format");
+        default:
+            return Status::Invalid(
+                fmt::format("invalid literal type {}", 
static_cast<int32_t>(literal_type)));
+    }
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/predicate_converter.h 
b/src/paimon/format/parquet/predicate_converter.h
new file mode 100644
index 0000000..5b44f84
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/expression.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class CompoundPredicate;
+class Function;
+class LeafPredicate;
+class Literal;
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+class PredicateConverter {
+ public:
+    PredicateConverter() = delete;
+    ~PredicateConverter() = delete;
+
+    // convert paimon predicate to arrow expression, if total node count of 
predicate exceed
+    // predicate_node_count_limit, will return AlwaysTrue
+    static Result<arrow::compute::Expression> Convert(const 
std::shared_ptr<Predicate>& predicate,
+                                                      uint32_t 
predicate_node_count_limit);
+
+    static arrow::compute::Expression AlwaysTrue();
+
+ private:
+    static Result<arrow::compute::Expression> InnerConvert(
+        const std::shared_ptr<Predicate>& predicate);
+
+    static void CollectNodeCount(const std::shared_ptr<Predicate>& predicate, 
uint32_t* node_count);
+
+    static Result<arrow::compute::Expression> ConvertCompound(
+        const std::shared_ptr<CompoundPredicate>& compound_predicate);
+
+    static Status CheckLiteralNotEmpty(const std::vector<Literal>& literals,
+                                       const Function& function, const 
std::string& field_name);
+
+    static Result<arrow::compute::Expression> ConvertLeaf(
+        const std::shared_ptr<LeafPredicate>& leaf_predicate);
+
+    static Result<arrow::compute::Expression> ConvertToArrowLiteral(const 
Literal& literal);
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/predicate_converter_test.cpp 
b/src/paimon/format/parquet/predicate_converter_test.cpp
new file mode 100644
index 0000000..a6661dc
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter_test.cpp
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/expression.h"
+#include "gtest/gtest.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+TEST(PredicateConverterTest, TestSimple) {
+    // 
"struct<f0:bigint,f1:double,f2:string,f3:int,f4:tinyint,f5:decimal(6,2),f6:date,f7:timestamp>";
+    {
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::BIGINT);
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("is_null(f0, {nan_is_null=false})", expression.ToString());
+    }
+    {
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::BIGINT);
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("invert(is_null(f0, {nan_is_null=false}))", 
expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                 FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 == 5)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                                 FieldType::INT, Literal(10));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f3 == 10)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/6, 
/*field_name=*/"f6",
+                                                 FieldType::DATE, 
Literal(FieldType::DATE, 10));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f6 == 1970-01-11)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                    FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 != 5)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                       FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 > 5)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                          FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 >= 5)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterOrEqual(
+            /*field_index=*/4, /*field_name=*/"f4", FieldType::TINYINT,
+            Literal(static_cast<int8_t>(16)));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f4 >= 16)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                    FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 < 5)", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                       FieldType::BIGINT, 
Literal(5l));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f0 <= 5)", expression.ToString());
+    }
+    {
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::BIGINT,
+                                 {Literal(1l), Literal(3l), Literal(5l)});
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(((f0 == 1) or (f0 == 3)) or (f0 == 5))", 
expression.ToString());
+    }
+    {
+        auto predicate =
+            PredicateBuilder::NotIn(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::BIGINT,
+                                    {Literal(1l), Literal(3l), Literal(5l)});
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(((f0 != 1) and (f0 != 3)) and (f0 != 5))", 
expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            const auto predicate,
+            PredicateBuilder::StartsWith(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::STRING,
+                                         Literal(FieldType::STRING, "aab", 
3)));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("starts_with(f0, {pattern=\"aab\", ignore_case=false})", 
expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            const auto predicate,
+            PredicateBuilder::EndsWith(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                       Literal(FieldType::STRING, "bcc", 3)));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("ends_with(f0, {pattern=\"bcc\", ignore_case=false})", 
expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            const auto predicate,
+            PredicateBuilder::Contains(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                       Literal(FieldType::STRING, "abc", 3)));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("match_substring(f0, {pattern=\"abc\", ignore_case=false})",
+                  expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            const auto predicate,
+            PredicateBuilder::Like(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                   Literal(FieldType::STRING, "abc", 3)));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("match_like(f0, {pattern=\"abc\", ignore_case=false})", 
expression.ToString());
+    }
+    {
+        // support decimal precision and scale mismatches between literal and 
data
+        auto predicate = PredicateBuilder::In(/*field_index=*/7, 
/*field_name=*/"f7",
+                                              FieldType::DECIMAL, 
{Literal(Decimal(5, 1, 12345))});
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(f7 == 1234.5)", expression.ToString());
+    }
+    {
+        // do not support pushdown predicate with timestamp literal, will 
always return true
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/6, /*field_name=*/"f6", 
FieldType::TIMESTAMP,
+                                 {Literal(Timestamp(1000, 12345))});
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("true", expression.ToString());
+    }
+    {
+        auto predicate = PredicateBuilder::LessOrEqual(
+            /*field_index=*/0, /*field_name=*/"f0", FieldType::BIGINT, 
Literal(FieldType::BIGINT));
+        ASSERT_NOK_WITH_MSG(
+            PredicateConverter::Convert(predicate, 
/*predicate_node_count_limit=*/100),
+            "literal cannot be null in predicate");
+    }
+}
+
+TEST(PredicateConverterTest, TestCompound) {
+    // 
"struct<f0:bigint,f1:float,f2:string,f3:boolean,f4:date,f5:timestamp,f6:decimal(6,2),f7:binary>";
+    {
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And({
+                PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::BIGINT,
+                                        Literal(3l)),
+                PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                        Literal(static_cast<float>(5.0))),
+                PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2", FieldType::STRING,
+                                        Literal(FieldType::STRING, "apple", 
5)),
+                PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN,
+                                        Literal(true)),
+                PredicateBuilder::Equal(/*field_index=*/4, 
/*field_name=*/"f4", FieldType::DATE,
+                                        Literal(FieldType::DATE, 3)),
+                PredicateBuilder::Equal(/*field_index=*/5, /*field_name=*/"f5",
+                                        FieldType::TIMESTAMP,
+                                        Literal(Timestamp(1725875365442l, 
12000))),
+                PredicateBuilder::Equal(/*field_index=*/6, 
/*field_name=*/"f6", FieldType::DECIMAL,
+                                        Literal(Decimal(6, 2, 123456))),
+                PredicateBuilder::Equal(/*field_index=*/7, 
/*field_name=*/"f7", FieldType::BINARY,
+                                        Literal(FieldType::BINARY, "add", 3)),
+            }));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ(
+            "((((((((f0 == 3) and (f1 == 5)) and (f2 == \"apple\")) and (f3 == 
true)) and (f4 == "
+            "1970-01-04)) and true) and (f6 == 1234.56)) and true)",
+            expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or({
+                PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::BIGINT,
+                                        Literal(3l)),
+                PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                        Literal(static_cast<float>(5.0))),
+                PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2", FieldType::STRING,
+                                        Literal(FieldType::STRING, "apple", 
5)),
+                PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN,
+                                        Literal(true)),
+            }));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("((((f0 == 3) or (f1 == 5)) or (f2 == \"apple\")) or (f3 == 
true))",
+                  expression.ToString());
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::And(
+                     {PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
Literal(true)),
+                      PredicateBuilder::LessThan(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                 FieldType::BIGINT, 
Literal(3l))})
+                     .value(),
+                 PredicateBuilder::And(
+                     {PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
Literal(false)),
+                      PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                 FieldType::FLOAT,
+                                                 
Literal(static_cast<float>(3.1)))})
+                     .value()}));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(((f3 == true) and (f0 < 3)) or ((f3 == false) and (f1 < 
3.1)))",
+                  expression.ToString());
+    }
+    {
+        // predicate nodes containing binary type will not be pushed down
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::Or(
+                     {PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
Literal(true)),
+                      PredicateBuilder::LessThan(
+                          /*field_index=*/7, /*field_name=*/"f7", 
FieldType::BINARY,
+                          Literal(FieldType::BINARY, "add", 3))})
+                     .value(),
+                 PredicateBuilder::Or(
+                     {PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
Literal(false)),
+                      PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                 FieldType::FLOAT,
+                                                 
Literal(static_cast<float>(3.1)))})
+                     .value()}));
+        ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+                                                  predicate, 
/*predicate_node_count_limit=*/100));
+        ASSERT_EQ("(((f3 == true) or true) and ((f3 == false) or (f1 < 3.1)))",
+                  expression.ToString());
+    }
+}
+
+TEST(PredicateConverterTest, TestCollectNodeCount) {
+    // And([And([Equal(f0, 10), NotEqual(f1, 20)]), Or([GreaterThan(v0, 30), 
LessThan(f3, 20)]),
+    // Or([LessOrEqual(f2, 50), In(f0, [20, 60]), In(f0, [120, 160])]), 
GreaterOrEqual(v1, 120)])
+    auto equal = PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::INT,
+                                         Literal(10));
+    auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                FieldType::INT, Literal(20));
+    // and_predicate has 3 nodes
+    ASSERT_OK_AND_ASSIGN(auto and_predicate, PredicateBuilder::And({equal, 
not_equal}));
+    uint32_t node_count = 0;
+    PredicateConverter::CollectNodeCount(and_predicate, &node_count);
+    ASSERT_EQ(node_count, 3);
+
+    auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/2, 
/*field_name=*/"v0",
+                                                      FieldType::INT, 
Literal(30));
+    auto less_than = PredicateBuilder::LessThan(/*field_index=*/3, 
/*field_name=*/"f3",
+                                                FieldType::INT, Literal(20));
+    // or_predicate has 3 nodes
+    ASSERT_OK_AND_ASSIGN(auto or_predicate, 
PredicateBuilder::Or({greater_than, less_than}));
+    node_count = 0;
+    PredicateConverter::CollectNodeCount(or_predicate, &node_count);
+    ASSERT_EQ(node_count, 3);
+
+    auto less_or_equal = PredicateBuilder::LessOrEqual(/*field_index=*/4, 
/*field_name=*/"f2",
+                                                       FieldType::INT, 
Literal(50));
+    auto in = PredicateBuilder::In(/*field_index=*/5, /*field_name=*/"f0", 
FieldType::INT,
+                                   {Literal(20), Literal(60)});
+    auto not_in = PredicateBuilder::NotIn(/*field_index=*/5, 
/*field_name=*/"f0", FieldType::INT,
+                                          {Literal(120), Literal(160)});
+    // or_predicate2 has 8 nodes
+    ASSERT_OK_AND_ASSIGN(auto or_predicate2, 
PredicateBuilder::Or({less_or_equal, in, not_in}));
+    node_count = 0;
+    PredicateConverter::CollectNodeCount(or_predicate2, &node_count);
+    ASSERT_EQ(node_count, 8);
+
+    auto greater_or_equal = 
PredicateBuilder::GreaterOrEqual(/*field_index=*/4, /*field_name=*/"v1",
+                                                             FieldType::INT, 
Literal(120));
+    node_count = 0;
+    PredicateConverter::CollectNodeCount(greater_or_equal, &node_count);
+    ASSERT_EQ(node_count, 1);
+
+    // predicate has (1+3+3+8+1) nodes
+    ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({and_predicate, 
or_predicate,
+                                                                or_predicate2, 
greater_or_equal}));
+    node_count = 0;
+    PredicateConverter::CollectNodeCount(predicate, &node_count);
+    ASSERT_EQ(node_count, 16);
+}
+
+TEST(PredicateConverterTest, TestExceedNodeCountLimit) {
+    // And([And([Equal(f0, 10), NotEqual(f1, 20)]), Or([GreaterThan(v0, 30), 
LessThan(f3, 20)]),
+    // Or([LessOrEqual(f2, 50), In(f0, [20, 60]), In(f0, [120, 160])]), 
GreaterOrEqual(v1, 120)])
+    auto equal = PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::INT,
+                                         Literal(10));
+    auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                FieldType::INT, Literal(20));
+    // and_predicate has 3 nodes
+    ASSERT_OK_AND_ASSIGN(auto and_predicate, PredicateBuilder::And({equal, 
not_equal}));
+
+    auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/2, 
/*field_name=*/"v0",
+                                                      FieldType::INT, 
Literal(30));
+    auto less_than = PredicateBuilder::LessThan(/*field_index=*/3, 
/*field_name=*/"f3",
+                                                FieldType::INT, Literal(20));
+    // or_predicate has 3 nodes
+    ASSERT_OK_AND_ASSIGN(auto or_predicate, 
PredicateBuilder::Or({greater_than, less_than}));
+
+    auto less_or_equal = PredicateBuilder::LessOrEqual(/*field_index=*/4, 
/*field_name=*/"f2",
+                                                       FieldType::INT, 
Literal(50));
+    auto in = PredicateBuilder::In(/*field_index=*/5, /*field_name=*/"f0", 
FieldType::INT,
+                                   {Literal(20), Literal(60)});
+    auto not_in = PredicateBuilder::NotIn(/*field_index=*/5, 
/*field_name=*/"f0", FieldType::INT,
+                                          {Literal(120), Literal(160)});
+    // or_predicate2 has 8 nodes
+    ASSERT_OK_AND_ASSIGN(auto or_predicate2, 
PredicateBuilder::Or({less_or_equal, in, not_in}));
+
+    auto greater_or_equal = 
PredicateBuilder::GreaterOrEqual(/*field_index=*/4, /*field_name=*/"v1",
+                                                             FieldType::INT, 
Literal(120));
+
+    // predicate has (1+3+3+8+1) nodes
+    ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({and_predicate, 
or_predicate,
+                                                                or_predicate2, 
greater_or_equal}));
+
+    ASSERT_OK_AND_ASSIGN(auto expression,
+                         PredicateConverter::Convert(predicate, 
/*predicate_node_count_limit=*/20));
+    ASSERT_EQ(expression.ToString(),
+              "(((((f0 == 10) and (f1 != 20)) and ((v0 > 30) or (f3 < 20))) 
and (((f2 <= 50) or "
+              "((f0 == 20) or (f0 == 60))) or ((f0 != 120) and (f0 != 160)))) 
and (v1 >= 120))");
+    ASSERT_OK_AND_ASSIGN(auto expression_always_true,
+                         PredicateConverter::Convert(predicate, 
/*predicate_node_count_limit=*/10));
+    ASSERT_EQ(expression_always_true.ToString(), "true");
+}
+
+TEST(PredicateConverterTest, TestWithoutPredicate) {
+    ASSERT_OK_AND_ASSIGN(auto expression,
+                         PredicateConverter::Convert(nullptr, 
/*predicate_node_count_limit=*/100));
+    ASSERT_EQ("true", expression.ToString());
+}
+
+TEST(PredicateConverterTest, TestInvalidCase) {
+    auto predicate =
+        PredicateBuilder::In(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::INT, {});
+    ASSERT_NOK_WITH_MSG(PredicateConverter::Convert(predicate, 
/*predicate_node_count_limit=*/100),
+                        "predicate [In] need literal on field f0");
+}
+
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp 
b/src/paimon/format/parquet/predicate_pushdown_test.cpp
new file mode 100644
index 0000000..67d7cd5
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp
@@ -0,0 +1,813 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet::test {
+
+class PredicatePushdownTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        arrow_pool_ = GetArrowPool(pool_);
+        batch_size_ = 10;
+
+        arrow::FieldVector fields = {
+            arrow::field("f0", arrow::utf8()),  arrow::field("f1", 
arrow::float32()),
+            arrow::field("f2", arrow::int64()), arrow::field("f3", 
arrow::boolean()),
+            arrow::field("f4", arrow::int64()), arrow::field("f5", 
arrow::binary())};
+
+        struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), 
R"([
+        ["apple", 4.0, 4, true, null, "add"],  ["banana", 4.0, 6, true, null, 
"bad"],
+        ["camera", 4.0, 8, true, null, "cat"], ["data", null, 10, true, null, 
"dad"]
+    ])")
+                .ValueOrDie());
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        file_name_ = dir_->Str() + "/test.data";
+        fs_ = dir_->GetFileSystem();
+    }
+
+    void TearDown() override {}
+
+    void PrepareTestData(const std::shared_ptr<arrow::StructArray>& 
struct_array) {
+        auto data_type = struct_array->struct_type();
+        auto data_schema = arrow::schema(data_type->fields());
+        auto data_arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*struct_array, 
data_arrow_array.get()).ok());
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs_->Create(file_name_, /*overwrite=*/false));
+        ::parquet::WriterProperties::Builder builder;
+        builder.write_batch_size(batch_size_);
+        auto writer_properties = builder.build();
+        ASSERT_OK_AND_ASSIGN(
+            auto format_writer,
+            ParquetFormatWriter::Create(out, data_schema, writer_properties,
+                                        DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+        ASSERT_OK(format_writer->AddBatch(data_arrow_array.get()));
+        ASSERT_OK(format_writer->Finish());
+        ASSERT_OK(out->Close());
+    }
+
+    void CheckResult(const std::shared_ptr<arrow::Schema>& read_schema,
+                     const std::shared_ptr<Predicate>& predicate,
+                     const std::shared_ptr<arrow::Array>& expected_array,
+                     uint32_t predicate_node_count_limit =
+                         
paimon::parquet::DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT) {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_name_));
+        ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
+        auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in, 
arrow_pool_, length);
+
+        std::map<std::string, std::string> options;
+        options[paimon::parquet::PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT] =
+            std::to_string(predicate_node_count_limit);
+        ASSERT_OK_AND_ASSIGN(auto batch_reader,
+                             
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_,
+                                                            options, 
batch_size_));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        ASSERT_TRUE(arrow_status.ok());
+        ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate,
+                                              
/*selection_bitmap=*/std::nullopt));
+        ASSERT_OK_AND_ASSIGN(auto arrow_array,
+                             
paimon::test::ReadResultCollector::CollectResult(batch_reader.get()));
+        if (expected_array) {
+            ASSERT_TRUE(arrow_array);
+            auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+            ASSERT_TRUE(expected_chunk_array->Equals(arrow_array)) << 
arrow_array->ToString();
+        } else {
+            ASSERT_FALSE(arrow_array);
+        }
+    }
+
+ private:
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+    std::shared_ptr<MemoryPool> pool_;
+    int32_t batch_size_;
+    std::shared_ptr<arrow::StructArray> struct_array_;
+    std::shared_ptr<FileSystem> fs_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::string file_name_;
+};
+
+TEST_F(PredicatePushdownTest, TestIntDoubleData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f1 == 4, has data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", 
FieldType::FLOAT,
+                                    Literal(static_cast<float>(4.0)));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f1 == 6, no data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", 
FieldType::FLOAT,
+                                    Literal(static_cast<float>(6.0)));
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    nullptr);
+    }
+    {
+        // f1 != 4, no data
+        auto predicate = PredicateBuilder::NotEqual(
+            /*field_index=*/1, /*field_name=*/"f1", FieldType::FLOAT,
+            Literal(static_cast<float>(4.0)));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 != 4, has data
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                    FieldType::BIGINT, 
Literal(4l));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 == 6, has data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                 FieldType::BIGINT, 
Literal(6l));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 == 1, no data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                 FieldType::BIGINT, 
Literal(1l));
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    nullptr);
+    }
+    {
+        // f2 in [1,2,3], no data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 in [1,2,3] but has small predicate node limit, has data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, expected_array,
+                    /*predicate_node_count_limit=*/1);
+    }
+    {
+        // f2 not in [1,2,3], has data
+        auto predicate =
+            PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                    {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 in [2,3,4], has data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(2l), Literal(3l), Literal(4l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 not in [2,3,4], has data
+        auto predicate =
+            PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                    {Literal(2l), Literal(3l), Literal(4l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestBoolData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f3 is null, no data
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f3", 
FieldType::BOOLEAN);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f3 is not null, has data
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f3 == true, has data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                                 FieldType::BOOLEAN, 
Literal(true));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::In(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
{Literal(false)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestStringData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f0 is null, no data
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 is not null, has data
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::STRING);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f0 == apple, has data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                    Literal(FieldType::STRING, "apple", 5));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f0 == anything, no data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                    Literal(FieldType::STRING, "anything", 8));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 > zooooooo, no data
+        auto predicate = PredicateBuilder::GreaterThan(
+            /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
+            Literal(FieldType::STRING, "zooooooo", 8));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 like 'ba%', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::StartsWith(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "ba", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like '%ta', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::EndsWith(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "ta", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like '%me%', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::Contains(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "me", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like 'me', no data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::Like(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "me", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestBinaryData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f5")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make({struct_array_->GetFieldByName("f5")}, 
fields).ValueOrDie();
+    // paimon does not pushdown binary type to parquet, will skip this 
predicate
+    {
+        // f5 < aaa, do not have data, but binary predicate will be skipped
+        auto predicate =
+            PredicateBuilder::LessThan(/*field_index=*/5, /*field_name=*/"f5", 
FieldType::BINARY,
+                                       Literal(FieldType::BINARY, "aaa", 3));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f5 >= zoo, do not have data, but binary predicate will be skipped
+        auto predicate = PredicateBuilder::GreaterOrEqual(
+            /*field_index=*/5, /*field_name=*/"f5", FieldType::BINARY,
+            Literal(FieldType::BINARY, "zoo", 3));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestPredicatePushdownWithAllDataNull) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f4 is null, has data
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/4, /*field_name=*/"f4", 
FieldType::BIGINT);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+
+    // other predicate, always return IS_NULL (no data)
+    {
+        // f4 in [1,2], no data
+        auto predicate = PredicateBuilder::In(/*field_index=*/4, 
/*field_name=*/"f4",
+                                              FieldType::BIGINT, {Literal(1l), 
Literal(2l)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f4 not in [1,2], no data
+        auto predicate = PredicateBuilder::NotIn(/*field_index=*/4, 
/*field_name=*/"f4",
+                                                 FieldType::BIGINT, 
{Literal(1l), Literal(2l)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f4 >= 3, no data
+        auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/4, 
/*field_name=*/"f4",
+                                                          FieldType::BIGINT, 
Literal(3l));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f4 <= 3, no data
+        auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/4, 
/*field_name=*/"f4",
+                                                       FieldType::BIGINT, 
Literal(3l));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestCompoundPredicate) {
+    PrepareTestData(struct_array_);
+    auto read_schema = arrow::schema(struct_array_->struct_type()->fields());
+    std::shared_ptr<arrow::Array> expected_array = struct_array_;
+    {
+        // f2 < 6 and f1 == 4 and f3 == true, has data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0))),
+                 PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN,
+                                         Literal(true))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 6 and f1 == 4 and f3 is null, no data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0))),
+                 PredicateBuilder::IsNull(/*field_index=*/3, 
/*field_name=*/"f3",
+                                          FieldType::BOOLEAN)}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 < 6 and f1 == 4 and f5 is null, no data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0))),
+                 PredicateBuilder::IsNull(/*field_index=*/5, 
/*field_name=*/"f5",
+                                          FieldType::BINARY)}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 < 6 and f1 == 4 and f5 == zoo, has data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0))),
+                 PredicateBuilder::Equal(/*field_index=*/5, 
/*field_name=*/"f5", FieldType::BINARY,
+                                         Literal(FieldType::BINARY, "zoo", 
3))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 6 and f1 == 5 and f5 is null, no data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::And(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(5.0))),
+                 PredicateBuilder::IsNull(/*field_index=*/5, 
/*field_name=*/"f5",
+                                          FieldType::BINARY)}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 < 6 or f1 == 4, has data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0)))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 6 or f1 == 5, has data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(6l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(5.0)))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 2 or f5 is null, no data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            
PredicateBuilder::Or({PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                             
FieldType::BIGINT, Literal(2l)),
+                                  PredicateBuilder::IsNull(/*field_index=*/5, 
/*field_name=*/"f5",
+                                                           
FieldType::BINARY)}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, nullptr);
+    }
+    {
+        // f2 < 2 or f5 = zoo, ignore <or predicate> as it contains binary
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(2l)),
+                 PredicateBuilder::Equal(/*field_index=*/5, 
/*field_name=*/"f5", FieldType::BINARY,
+                                         Literal(FieldType::BINARY, "zoo", 
3))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 2 or f1 == 4 or f3 == false, has data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(2l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(4.0))),
+                 PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN,
+                                         Literal(false))}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 < 2 or f1 == 5 or f3 is null, no data
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/2, 
/*field_name=*/"f2",
+                                            FieldType::BIGINT, Literal(2l)),
+                 PredicateBuilder::Equal(/*field_index=*/1, 
/*field_name=*/"f1", FieldType::FLOAT,
+                                         Literal(static_cast<float>(5.0))),
+                 PredicateBuilder::IsNull(/*field_index=*/3, 
/*field_name=*/"f3",
+                                          FieldType::BOOLEAN)}));
+        ASSERT_TRUE(predicate);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestComplexType) {
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::int32()),
+        arrow::field("f2", arrow::int32()),
+        arrow::field("f3", arrow::date32()),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f5", arrow::decimal128(23, 5)),
+    };
+    auto read_schema = arrow::schema(fields);
+    auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [10, 1, 1234,  "2033-05-18 03:33:20.0",         
"123456789987654321.45678"],
+        [10, 1, 19909, "2033-05-18 03:33:20.000001001", "12.30000"],
+        [10, 1, 0,     "2008-12-28 00:00:00.000123456", "12.30000"],
+        [10, 1, 100,   "2008-12-28 00:00:00.00012345",  "-123.45000"],
+        [10, 1, 100,  "1899-01-01 00:59:20.001001001", "0.00000"],
+        [10, 1, 20006, "2024-10-10 10:10:10.100100100", "1728551410100.10010"]
+    ])")
+            .ValueOrDie());
+    PrepareTestData(expected_array);
+    //  date
+    {
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f3", 
FieldType::DATE);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/2, 
/*field_name=*/"f3", FieldType::DATE);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f3",
+                                                 FieldType::DATE, 
Literal(FieldType::DATE, 4));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f3",
+                                                 FieldType::DATE, 
Literal(FieldType::DATE, -111));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::LessThan(
+            /*field_index=*/2, /*field_name=*/"f3", FieldType::DATE,
+            Literal(FieldType::DATE, 20006));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterThan(
+            /*field_index=*/2, /*field_name=*/"f3", FieldType::DATE,
+            Literal(FieldType::DATE, 20006));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+
+    // timestamp, always be ignored
+    {
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f4", 
FieldType::TIMESTAMP);
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::IsNotNull(/*field_index=*/3, 
/*field_name=*/"f4",
+                                                     FieldType::TIMESTAMP);
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    expected_array);
+    }
+    {
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f4", 
FieldType::TIMESTAMP,
+                                    Literal(Timestamp(2240521239999l, 0)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        auto predicate =
+            PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f4", 
FieldType::TIMESTAMP,
+                                       Literal(Timestamp(1230422400000l, 
123460)));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, 
/*field_name=*/"f4",
+                                                       FieldType::TIMESTAMP,
+                                                       
Literal(Timestamp(2000000000000l, 1001)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+
+    // decimal
+    {
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/4, /*field_name=*/"f5", 
FieldType::DECIMAL);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/4, 
/*field_name=*/"f5", FieldType::DECIMAL);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            Literal(Decimal(23, 5, 123456)));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            Literal(Decimal(22, 3, -123456)));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::LessThan(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            Literal(Decimal(23, 3, 
DecimalUtils::StrToInt128("-123456789987654321567").value())));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::LessThan(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            Literal(Decimal(23, 3, 
DecimalUtils::StrToInt128("123456789987654321567").value())));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterThan(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            Literal(Decimal(23, 3, 
DecimalUtils::StrToInt128("123456789987654321567").value())));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::In(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            {Literal(Decimal(23, 5, 
DecimalUtils::StrToInt128("-12345678998765432134567").value())),
+             Literal(Decimal(23, 5, 1234567))});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::NotIn(
+            /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+            {Literal(Decimal(23, 5, 
DecimalUtils::StrToInt128("-12345678998765432134567").value())),
+             Literal(Decimal(23, 5, 1234567))});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestAllNullOrAllSameValue) {
+    arrow::FieldVector fields = {arrow::field("f1", arrow::int32()),
+                                 arrow::field("f2", arrow::int32())};
+    auto read_schema = arrow::schema(fields);
+    auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [null, 10],
+        [null, 10],
+        [null, 10]
+    ])")
+            .ValueOrDie());
+    PrepareTestData(expected_array);
+    // for f1
+    {
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f1", 
FieldType::INT);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/0, 
/*field_name=*/"f1", FieldType::INT);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                 FieldType::INT, Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                    FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                       FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                          FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                    FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                       FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::In(/*field_index=*/0, 
/*field_name=*/"f1",
+                                              FieldType::INT, {Literal(10), 
Literal(20)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::NotIn(/*field_index=*/0, 
/*field_name=*/"f1",
+                                                 FieldType::INT, {Literal(10), 
Literal(20)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    // for f2
+    {
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/1, 
/*field_name=*/"f2",
+                                                    FieldType::INT, 
Literal(10));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/1, 
/*field_name=*/"f2",
+                                                    FieldType::INT, 
Literal(30));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::In(/*field_index=*/1, 
/*field_name=*/"f2",
+                                              FieldType::INT, {Literal(10), 
Literal(20)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::NotIn(/*field_index=*/1, 
/*field_name=*/"f2",
+                                                 FieldType::INT, {Literal(10), 
Literal(20)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        auto predicate = PredicateBuilder::NotIn(/*field_index=*/1, 
/*field_name=*/"f2",
+                                                 FieldType::INT, {Literal(20), 
Literal(30)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+}
+}  // namespace paimon::parquet::test

Reply via email to