Copilot commented on code in PR #53:
URL: https://github.com/apache/paimon-cpp/pull/53#discussion_r3370627972


##########
src/paimon/format/parquet/predicate_converter.h:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/expression.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class CompoundPredicate;
+class Function;
+class LeafPredicate;
+class Literal;
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+class PredicateConverter {
+ public:
+    PredicateConverter() = delete;
+    ~PredicateConverter() = delete;
+
+    // convert paimon predicate to arrow expression, if total node count of 
predicate exceed
+    // predicate_node_count_limit, will return AlwaysTrue
+    static Result<arrow::compute::Expression> Convert(const 
std::shared_ptr<Predicate>& predicate,
+                                                      uint32_t 
predicate_node_count_limit);
+
+    static arrow::compute::Expression AlwaysTrue();
+
+ private:
+    static Result<arrow::compute::Expression> InnerConvert(
+        const std::shared_ptr<Predicate>& predicate);
+
+    static void CollectNodeCount(const std::shared_ptr<Predicate>& predicate, 
uint32_t* node_count);

Review Comment:
   `CollectNodeCount` is declared `private`, but it is called directly by 
`predicate_converter_test.cpp` (`PredicateConverter::CollectNodeCount(...)`). 
This will not compile. Make `CollectNodeCount` public, or keep it private and 
update the test to validate node counting indirectly via `Convert(..., 
predicate_node_count_limit)` (or add a test-only friend declaration).



##########
src/paimon/format/parquet/predicate_pushdown_test.cpp:
##########
@@ -0,0 +1,813 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet::test {
+
+class PredicatePushdownTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        arrow_pool_ = GetArrowPool(pool_);
+        batch_size_ = 10;
+
+        arrow::FieldVector fields = {
+            arrow::field("f0", arrow::utf8()),  arrow::field("f1", 
arrow::float32()),
+            arrow::field("f2", arrow::int64()), arrow::field("f3", 
arrow::boolean()),
+            arrow::field("f4", arrow::int64()), arrow::field("f5", 
arrow::binary())};
+
+        struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), 
R"([
+        ["apple", 4.0, 4, true, null, "add"],  ["banana", 4.0, 6, true, null, 
"bad"],
+        ["camera", 4.0, 8, true, null, "cat"], ["data", null, 10, true, null, 
"dad"]
+    ])")
+                .ValueOrDie());
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        file_name_ = dir_->Str() + "/test.data";
+        fs_ = dir_->GetFileSystem();
+    }
+
+    void TearDown() override {}
+
+    void PrepareTestData(const std::shared_ptr<arrow::StructArray>& 
struct_array) {
+        auto data_type = struct_array->struct_type();
+        auto data_schema = arrow::schema(data_type->fields());
+        auto data_arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*struct_array, 
data_arrow_array.get()).ok());

Review Comment:
   `arrow::ExportArray` uses the Arrow C Data Interface and requires calling 
`ArrowArray::release` to free the exported buffers/private data. In this test 
helper the exported `ArrowArray` is never released, which can leak memory and 
trip ASAN/LSAN in CI. Consider adding a small RAII guard (or manually calling 
`data_arrow_array->release(data_arrow_array.get())` after `AddBatch`) for 
exported `ArrowArray` / `ArrowSchema` objects in this test file.



##########
src/paimon/format/parquet/predicate_converter.cpp:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/expression.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon::parquet {
+arrow::compute::Expression PredicateConverter::AlwaysTrue() {
+    static const arrow::compute::Expression expr = 
arrow::compute::literal(true);
+    return expr;
+}
+
+Result<arrow::compute::Expression> PredicateConverter::Convert(
+    const std::shared_ptr<Predicate>& predicate, uint32_t node_count_limit) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    uint32_t node_count = 0;
+    CollectNodeCount(predicate, &node_count);
+    if (node_count > node_count_limit) {
+        return AlwaysTrue();
+    }
+    return InnerConvert(predicate);
+}
+
+void PredicateConverter::CollectNodeCount(const std::shared_ptr<Predicate>& 
predicate,
+                                          uint32_t* node_count) {
+    const auto& function_type = predicate->GetFunction().GetType();
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        if (function_type == Function::Type::IN || function_type == 
Function::Type::NOT_IN) {
+            // IN and NOT_IN will be converted to Or(Equals) and And(NotEqual)
+            *node_count += leaf_predicate->Literals().size();
+        }
+        *node_count += 1;

Review Comment:
   The node-count accounting for `IN` / `NOT_IN` appears inconsistent with the 
comment and the actual conversion. `IN` is converted into an OR-tree of 
`equal(...)` expressions (and `NOT_IN` into an AND-tree of `not_equal(...)`), 
which typically results in `#comparisons + (#comparisons - 1)` nodes (e.g., 3 
literals -> 3 equals + 2 ors = 5). Current logic adds `literals.size() + 1` (3 
-> 4), which undercounts and may let overly-large predicates slip under 
`predicate_node_count_limit`. Consider counting as `2 * literals.size() - 1` 
for `IN`/`NOT_IN` (or otherwise aligning the counting rule with the conversion 
strategy used by `arrow::compute::or_(...)` / `and_(...)`).



##########
src/paimon/format/parquet/parquet_stats_extractor_test.cpp:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+
+#include <cstddef>
+#include <map>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/compare.h"
+#include "arrow/io/file.h"
+#include "arrow/ipc/api.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/core/stats/simple_stats_converter.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace paimon::parquet::test {
+
+class ParquetStatsExtractorTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+    }
+    void TearDown() override {}
+
+    void CheckStats(const arrow::FieldVector& fields, const std::string& input,
+                    const std::vector<std::string>& expected_stats, int64_t 
expect_row_count) {
+        auto arrow_schema = arrow::schema(fields);
+        auto struct_type = arrow::struct_(fields);
+        std::map<std::string, std::string> options;
+        std::shared_ptr<arrow::MemoryPool> pool = 
GetArrowPool(GetDefaultPool());
+        std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+        std::string file_name;
+        ASSERT_TRUE(UUID::Generate(&file_name));
+        std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs->Create(file_path, /*overwrite=*/false));
+        ::parquet::WriterProperties::Builder builder;
+        builder.enable_store_decimal_as_integer();
+        ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create(
+                                                     out, arrow_schema, 
builder.build(),
+                                                     
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool));
+        auto array = arrow::ipc::internal::json::ArrayFromJSON(struct_type, 
input).ValueOrDie();
+        auto arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+        ASSERT_OK(format_writer->AddBatch(arrow_array.get()));

Review Comment:
   Similar to the predicate pushdown tests, `arrow::ExportArray` requires 
calling `ArrowArray::release` to avoid leaks. `arrow_array` is never released 
after `AddBatch`, which can leak in sanitizer builds. Please add a release/RAII 
guard for exported C-data objects used in this test helper (and similarly for 
the stack `ArrowArray c_array` in `TestNullForAllType`).



##########
src/paimon/format/parquet/predicate_converter.cpp:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/expression.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon::parquet {
+arrow::compute::Expression PredicateConverter::AlwaysTrue() {
+    static const arrow::compute::Expression expr = 
arrow::compute::literal(true);
+    return expr;
+}
+
+Result<arrow::compute::Expression> PredicateConverter::Convert(
+    const std::shared_ptr<Predicate>& predicate, uint32_t node_count_limit) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    uint32_t node_count = 0;
+    CollectNodeCount(predicate, &node_count);
+    if (node_count > node_count_limit) {
+        return AlwaysTrue();
+    }
+    return InnerConvert(predicate);
+}
+
+void PredicateConverter::CollectNodeCount(const std::shared_ptr<Predicate>& 
predicate,
+                                          uint32_t* node_count) {
+    const auto& function_type = predicate->GetFunction().GetType();
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        if (function_type == Function::Type::IN || function_type == 
Function::Type::NOT_IN) {
+            // IN and NOT_IN will be converted to Or(Equals) and And(NotEqual)
+            *node_count += leaf_predicate->Literals().size();
+        }
+        *node_count += 1;
+        return;
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        *node_count += 1;
+        for (const auto& child : compound_predicate->Children()) {
+            CollectNodeCount(child, node_count);
+        }
+    }
+}
+
+Result<arrow::compute::Expression> PredicateConverter::InnerConvert(
+    const std::shared_ptr<Predicate>& predicate) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        return ConvertLeaf(leaf_predicate);
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        return ConvertCompound(compound_predicate);
+    }
+    return Status::Invalid("invalid predicate, must be leaf or compound");
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertCompound(
+    const std::shared_ptr<CompoundPredicate>& compound_predicate) {
+    const auto& children = compound_predicate->Children();
+    const auto& function = compound_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::AND: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::OR: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("invalid predicate type {}", 
static_cast<int32_t>(function_type)));
+    }
+}
+
+Status PredicateConverter::CheckLiteralNotEmpty(const std::vector<Literal>& 
literals,
+                                                const Function& function,
+                                                const std::string& field_name) 
{
+    if (literals.empty()) {
+        return Status::Invalid(fmt::format("predicate [{}] need literal on 
field {}",
+                                           function.ToString(), field_name));
+    }
+    return Status::OK();
+}
+
+#define CONVERT_TO_ARROW_LITERAL(LITERAL)                                      
           \
+    auto arrow_literal_result = ConvertToArrowLiteral(LITERAL);                
           \
+    if (!arrow_literal_result.ok() && 
arrow_literal_result.status().IsNotImplemented()) { \
+        return AlwaysTrue();                                                   
           \
+    }                                                                          
           \
+    if (!arrow_literal_result.ok()) {                                          
           \
+        return arrow_literal_result.status();                                  
           \
+    }                                                                          
           \
+    auto arrow_literal = std::move(arrow_literal_result).value();
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertLeaf(
+    const std::shared_ptr<LeafPredicate>& leaf_predicate) {
+    const auto& field_name = leaf_predicate->FieldName();
+    const auto& literals = leaf_predicate->Literals();
+    const auto& function = leaf_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::IS_NULL: {
+            return 
arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                           /*nan_is_null=*/false);
+        }
+        case Function::Type::IS_NOT_NULL: {
+            return arrow::compute::not_(
+                arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                        /*nan_is_null=*/false));
+        }
+        case Function::Type::EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::NOT_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::not_equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater_equal(arrow::compute::field_ref(field_name),
+                                                 arrow_literal);
+        }
+        case Function::Type::LESS_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return arrow::compute::less(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        case Function::Type::LESS_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::less_equal(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        // Noted that: java paimon don't support pushdown IN and NOT_IN to 
parquet
+        case Function::Type::IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // in convert to Or(Equals)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                sub_exprs.push_back(
+                    
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        case Function::Type::NOT_IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // not in convert to And(NotEqual)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                
sub_exprs.push_back(arrow::compute::not_equal(arrow::compute::field_ref(field_name),
+                                                              arrow_literal));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::STARTS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("starts_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::ENDS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("ends_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::CONTAINS: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_substring", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::LIKE: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_like", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }

Review Comment:
   `match_like` in Arrow Compute commonly uses a dedicated options type (e.g., 
`MatchLikeOptions`) rather than `MatchSubstringOptions`. Passing the wrong 
`FunctionOptions` type can lead to runtime validation errors when the 
expression is bound/executed. Please switch to the options type expected by the 
`match_like` kernel (or use the Arrow helper for `match_like` if available in 
your Arrow version).



##########
src/paimon/format/parquet/predicate_pushdown_test.cpp:
##########
@@ -0,0 +1,813 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet::test {
+
+class PredicatePushdownTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        arrow_pool_ = GetArrowPool(pool_);
+        batch_size_ = 10;
+
+        arrow::FieldVector fields = {
+            arrow::field("f0", arrow::utf8()),  arrow::field("f1", 
arrow::float32()),
+            arrow::field("f2", arrow::int64()), arrow::field("f3", 
arrow::boolean()),
+            arrow::field("f4", arrow::int64()), arrow::field("f5", 
arrow::binary())};
+
+        struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), 
R"([
+        ["apple", 4.0, 4, true, null, "add"],  ["banana", 4.0, 6, true, null, 
"bad"],
+        ["camera", 4.0, 8, true, null, "cat"], ["data", null, 10, true, null, 
"dad"]
+    ])")
+                .ValueOrDie());
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        file_name_ = dir_->Str() + "/test.data";
+        fs_ = dir_->GetFileSystem();
+    }
+
+    void TearDown() override {}
+
+    void PrepareTestData(const std::shared_ptr<arrow::StructArray>& 
struct_array) {
+        auto data_type = struct_array->struct_type();
+        auto data_schema = arrow::schema(data_type->fields());
+        auto data_arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*struct_array, 
data_arrow_array.get()).ok());
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs_->Create(file_name_, /*overwrite=*/false));
+        ::parquet::WriterProperties::Builder builder;
+        builder.write_batch_size(batch_size_);
+        auto writer_properties = builder.build();
+        ASSERT_OK_AND_ASSIGN(
+            auto format_writer,
+            ParquetFormatWriter::Create(out, data_schema, writer_properties,
+                                        DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+        ASSERT_OK(format_writer->AddBatch(data_arrow_array.get()));
+        ASSERT_OK(format_writer->Finish());
+        ASSERT_OK(out->Close());
+    }
+
+    void CheckResult(const std::shared_ptr<arrow::Schema>& read_schema,
+                     const std::shared_ptr<Predicate>& predicate,
+                     const std::shared_ptr<arrow::Array>& expected_array,
+                     uint32_t predicate_node_count_limit =
+                         
paimon::parquet::DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT) {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_name_));
+        ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
+        auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in, 
arrow_pool_, length);
+
+        std::map<std::string, std::string> options;
+        options[paimon::parquet::PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT] =
+            std::to_string(predicate_node_count_limit);
+        ASSERT_OK_AND_ASSIGN(auto batch_reader,
+                             
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_,
+                                                            options, 
batch_size_));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        ASSERT_TRUE(arrow_status.ok());
+        ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate,
+                                              
/*selection_bitmap=*/std::nullopt));
+        ASSERT_OK_AND_ASSIGN(auto arrow_array,
+                             
paimon::test::ReadResultCollector::CollectResult(batch_reader.get()));
+        if (expected_array) {
+            ASSERT_TRUE(arrow_array);
+            auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+            ASSERT_TRUE(expected_chunk_array->Equals(arrow_array)) << 
arrow_array->ToString();
+        } else {
+            ASSERT_FALSE(arrow_array);
+        }
+    }
+
+ private:
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+    std::shared_ptr<MemoryPool> pool_;
+    int32_t batch_size_;
+    std::shared_ptr<arrow::StructArray> struct_array_;
+    std::shared_ptr<FileSystem> fs_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::string file_name_;
+};
+
+TEST_F(PredicatePushdownTest, TestIntDoubleData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f1 == 4, has data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", 
FieldType::FLOAT,
+                                    Literal(static_cast<float>(4.0)));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f1 == 6, no data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", 
FieldType::FLOAT,
+                                    Literal(static_cast<float>(6.0)));
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    nullptr);
+    }
+    {
+        // f1 != 4, no data
+        auto predicate = PredicateBuilder::NotEqual(
+            /*field_index=*/1, /*field_name=*/"f1", FieldType::FLOAT,
+            Literal(static_cast<float>(4.0)));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 != 4, has data
+        auto predicate = PredicateBuilder::NotEqual(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                    FieldType::BIGINT, 
Literal(4l));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 == 6, has data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                 FieldType::BIGINT, 
Literal(6l));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 == 1, no data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2",
+                                                 FieldType::BIGINT, 
Literal(1l));
+        CheckResult(read_schema, predicate, /*expected_array=*/
+                    nullptr);
+    }
+    {
+        // f2 in [1,2,3], no data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f2 in [1,2,3] but has small predicate node limit, has data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, expected_array,
+                    /*predicate_node_count_limit=*/1);
+    }
+    {
+        // f2 not in [1,2,3], has data
+        auto predicate =
+            PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                    {Literal(1l), Literal(2l), Literal(3l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 in [2,3,4], has data
+        auto predicate =
+            PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                 {Literal(2l), Literal(3l), Literal(4l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f2 not in [2,3,4], has data
+        auto predicate =
+            PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2", 
FieldType::BIGINT,
+                                    {Literal(2l), Literal(3l), Literal(4l)});
+        CheckResult(read_schema, predicate, expected_array);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestBoolData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f3 is null, no data
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f3", 
FieldType::BOOLEAN);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f3 is not null, has data
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/3, 
/*field_name=*/"f3", FieldType::BOOLEAN);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f3 == true, has data
+        auto predicate = PredicateBuilder::Equal(/*field_index=*/3, 
/*field_name=*/"f3",
+                                                 FieldType::BOOLEAN, 
Literal(true));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        auto predicate = PredicateBuilder::In(/*field_index=*/3, 
/*field_name=*/"f3",
+                                              FieldType::BOOLEAN, 
{Literal(false)});
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+}
+
+TEST_F(PredicatePushdownTest, TestStringData) {
+    PrepareTestData(struct_array_);
+    auto data_type = struct_array_->struct_type();
+    arrow::FieldVector fields = {data_type->GetFieldByName("f0"), 
data_type->GetFieldByName("f1"),
+                                 data_type->GetFieldByName("f2"), 
data_type->GetFieldByName("f3"),
+                                 data_type->GetFieldByName("f4")};
+    auto read_schema = arrow::schema(fields);
+    std::shared_ptr<arrow::Array> expected_array =
+        arrow::StructArray::Make(
+            {struct_array_->GetFieldByName("f0"), 
struct_array_->GetFieldByName("f1"),
+             struct_array_->GetFieldByName("f2"), 
struct_array_->GetFieldByName("f3"),
+             struct_array_->GetFieldByName("f4")},
+            fields)
+            .ValueOrDie();
+    {
+        // f0 is null, no data
+        auto predicate =
+            PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING);
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 is not null, has data
+        auto predicate =
+            PredicateBuilder::IsNotNull(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::STRING);
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f0 == apple, has data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                    Literal(FieldType::STRING, "apple", 5));
+        CheckResult(read_schema, predicate, expected_array);
+    }
+    {
+        // f0 == anything, no data
+        auto predicate =
+            PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                    Literal(FieldType::STRING, "anything", 8));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 > zooooooo, no data
+        auto predicate = PredicateBuilder::GreaterThan(
+            /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
+            Literal(FieldType::STRING, "zooooooo", 8));
+        CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+    }
+    {
+        // f0 like 'ba%', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::StartsWith(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "ba", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like '%ta', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::EndsWith(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "ta", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like '%me%', has data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::Contains(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "me", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+    }
+    {
+        // f0 like 'me', no data
+        ASSERT_OK_AND_ASSIGN(const auto predicate,
+                             PredicateBuilder::Like(
+                                 /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,
+                                 Literal(FieldType::STRING, "me", 2)));
+        CheckResult(read_schema, predicate, /*expected_array=*/expected_array);

Review Comment:
   The comment says this predicate should yield \"no data\", but the assertion 
expects `expected_array`. Please align the comment and expected result (either 
adjust the expectation to `nullptr`, or clarify in the comment that LIKE 
predicates are intentionally not pushed down / not used for row-group pruning 
so the full data is returned).



##########
src/paimon/format/parquet/predicate_converter.cpp:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/expression.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon::parquet {
+arrow::compute::Expression PredicateConverter::AlwaysTrue() {
+    static const arrow::compute::Expression expr = 
arrow::compute::literal(true);
+    return expr;
+}
+
+Result<arrow::compute::Expression> PredicateConverter::Convert(
+    const std::shared_ptr<Predicate>& predicate, uint32_t node_count_limit) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    uint32_t node_count = 0;
+    CollectNodeCount(predicate, &node_count);
+    if (node_count > node_count_limit) {
+        return AlwaysTrue();
+    }
+    return InnerConvert(predicate);
+}
+
+void PredicateConverter::CollectNodeCount(const std::shared_ptr<Predicate>& 
predicate,
+                                          uint32_t* node_count) {
+    const auto& function_type = predicate->GetFunction().GetType();
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        if (function_type == Function::Type::IN || function_type == 
Function::Type::NOT_IN) {
+            // IN and NOT_IN will be converted to Or(Equals) and And(NotEqual)
+            *node_count += leaf_predicate->Literals().size();
+        }
+        *node_count += 1;
+        return;
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        *node_count += 1;
+        for (const auto& child : compound_predicate->Children()) {
+            CollectNodeCount(child, node_count);
+        }
+    }
+}
+
+Result<arrow::compute::Expression> PredicateConverter::InnerConvert(
+    const std::shared_ptr<Predicate>& predicate) {
+    if (!predicate) {
+        return AlwaysTrue();
+    }
+    if (auto leaf_predicate = 
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+        return ConvertLeaf(leaf_predicate);
+    }
+    if (auto compound_predicate = 
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+        return ConvertCompound(compound_predicate);
+    }
+    return Status::Invalid("invalid predicate, must be leaf or compound");
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertCompound(
+    const std::shared_ptr<CompoundPredicate>& compound_predicate) {
+    const auto& children = compound_predicate->Children();
+    const auto& function = compound_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::AND: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::OR: {
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(children.size());
+            for (const auto& child : children) {
+                PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr, 
InnerConvert(child));
+                sub_exprs.push_back(std::move(sub_expr));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("invalid predicate type {}", 
static_cast<int32_t>(function_type)));
+    }
+}
+
+Status PredicateConverter::CheckLiteralNotEmpty(const std::vector<Literal>& 
literals,
+                                                const Function& function,
+                                                const std::string& field_name) 
{
+    if (literals.empty()) {
+        return Status::Invalid(fmt::format("predicate [{}] need literal on 
field {}",
+                                           function.ToString(), field_name));
+    }
+    return Status::OK();
+}
+
+#define CONVERT_TO_ARROW_LITERAL(LITERAL)                                      
           \
+    auto arrow_literal_result = ConvertToArrowLiteral(LITERAL);                
           \
+    if (!arrow_literal_result.ok() && 
arrow_literal_result.status().IsNotImplemented()) { \
+        return AlwaysTrue();                                                   
           \
+    }                                                                          
           \
+    if (!arrow_literal_result.ok()) {                                          
           \
+        return arrow_literal_result.status();                                  
           \
+    }                                                                          
           \
+    auto arrow_literal = std::move(arrow_literal_result).value();
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertLeaf(
+    const std::shared_ptr<LeafPredicate>& leaf_predicate) {
+    const auto& field_name = leaf_predicate->FieldName();
+    const auto& literals = leaf_predicate->Literals();
+    const auto& function = leaf_predicate->GetFunction();
+    auto function_type = function.GetType();
+    switch (function_type) {
+        case Function::Type::IS_NULL: {
+            return 
arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                           /*nan_is_null=*/false);
+        }
+        case Function::Type::IS_NOT_NULL: {
+            return arrow::compute::not_(
+                arrow::compute::is_null(arrow::compute::field_ref(field_name),
+                                        /*nan_is_null=*/false));
+        }
+        case Function::Type::EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::NOT_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::not_equal(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater(arrow::compute::field_ref(field_name), arrow_literal);
+        }
+        case Function::Type::GREATER_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::greater_equal(arrow::compute::field_ref(field_name),
+                                                 arrow_literal);
+        }
+        case Function::Type::LESS_THAN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return arrow::compute::less(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        case Function::Type::LESS_OR_EQUAL: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            CONVERT_TO_ARROW_LITERAL(literals[0]);
+            return 
arrow::compute::less_equal(arrow::compute::field_ref(field_name), 
arrow_literal);
+        }
+        // Noted that: java paimon don't support pushdown IN and NOT_IN to 
parquet
+        case Function::Type::IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // in convert to Or(Equals)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                sub_exprs.push_back(
+                    
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal));
+            }
+            return arrow::compute::or_(sub_exprs);
+        }
+        case Function::Type::NOT_IN: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            // not in convert to And(NotEqual)
+            std::vector<arrow::compute::Expression> sub_exprs;
+            sub_exprs.reserve(literals.size());
+            for (const auto& literal : literals) {
+                CONVERT_TO_ARROW_LITERAL(literal);
+                
sub_exprs.push_back(arrow::compute::not_equal(arrow::compute::field_ref(field_name),
+                                                              arrow_literal));
+            }
+            return arrow::compute::and_(sub_exprs);
+        }
+        case Function::Type::STARTS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("starts_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::ENDS_WITH: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("ends_with", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::CONTAINS: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_substring", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        case Function::Type::LIKE: {
+            PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function, 
field_name));
+            auto options = 
std::make_shared<arrow::compute::MatchSubstringOptions>(
+                literals[0].GetValue<std::string>());
+            return arrow::compute::call("match_like", 
{arrow::compute::field_ref(field_name)},
+                                        options);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("invalid predicate type {}", 
static_cast<int32_t>(function_type)));
+    }
+    return Status::OK();
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertToArrowLiteral(
+    const Literal& literal) {
+    auto literal_type = literal.GetType();
+    if (literal.IsNull()) {
+        return Status::Invalid("literal cannot be null in predicate");
+    }
+    switch (literal_type) {
+        case FieldType::BOOLEAN:
+            return 
arrow::compute::literal(std::make_shared<arrow::BooleanScalar>(
+                static_cast<bool>(literal.GetValue<bool>())));
+        case FieldType::TINYINT:
+            return arrow::compute::literal(std::make_shared<arrow::Int8Scalar>(
+                static_cast<int8_t>(literal.GetValue<int8_t>())));
+        case FieldType::SMALLINT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int16Scalar>(
+                static_cast<int16_t>(literal.GetValue<int16_t>())));
+        case FieldType::INT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int32Scalar>(
+                static_cast<int32_t>(literal.GetValue<int32_t>())));
+        case FieldType::DATE:
+            return 
arrow::compute::literal(std::make_shared<arrow::Date32Scalar>(
+                static_cast<int32_t>(literal.GetValue<int32_t>())));
+        case FieldType::BIGINT:
+            return 
arrow::compute::literal(std::make_shared<arrow::Int64Scalar>(
+                static_cast<int64_t>(literal.GetValue<int64_t>())));
+        case FieldType::FLOAT:
+            return 
arrow::compute::literal(std::make_shared<arrow::FloatScalar>(
+                static_cast<float>(literal.GetValue<float>())));
+        case FieldType::DOUBLE:
+            return 
arrow::compute::literal(std::make_shared<arrow::DoubleScalar>(
+                static_cast<double>(literal.GetValue<double>())));
+        case FieldType::STRING: {
+            auto str = literal.GetValue<std::string>();
+            return 
arrow::compute::literal(std::make_shared<arrow::StringScalar>(str));
+        }
+        case FieldType::DECIMAL: {
+            auto decimal = literal.GetValue<Decimal>();
+            return 
arrow::compute::literal(std::make_shared<arrow::Decimal128Scalar>(
+                arrow::Decimal128(decimal.HighBits(), decimal.LowBits()),
+                arrow::decimal128(decimal.Precision(), decimal.Scale())));
+        }
+        // TODO(lisizhuo.lsz): java paimon does not support BINARY, TIMESTAMP 
and DECIMAL
+        case FieldType::TIMESTAMP:
+        case FieldType::BINARY:
+            return Status::NotImplemented(
+                "Not support Binary and Timestamp predicate push down in 
parquet file "
+                "format");

Review Comment:
   The TODO/comment says Java Paimon does not support `DECIMAL`, but `DECIMAL` 
is handled above and is supported (`case FieldType::DECIMAL`). Please 
update/remove the comment to reflect the current behavior (unsupported types 
here are `BINARY` and `TIMESTAMP`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to