This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f551462 feat(parquet): introduce stats extraction and predicate
pushdown (#53)
f551462 is described below
commit f5514628dceb195479110cb6bed14b8ae8bfc09f
Author: Zhang Jiawei <[email protected]>
AuthorDate: Mon Jun 8 11:58:49 2026 +0800
feat(parquet): introduce stats extraction and predicate pushdown (#53)
---
.../format/parquet/parquet_stats_extractor.cpp | 340 +++++++++
.../format/parquet/parquet_stats_extractor.h | 82 +++
.../parquet/parquet_stats_extractor_test.cpp | 351 +++++++++
src/paimon/format/parquet/predicate_converter.cpp | 300 ++++++++
src/paimon/format/parquet/predicate_converter.h | 74 ++
.../format/parquet/predicate_converter_test.cpp | 396 ++++++++++
.../format/parquet/predicate_pushdown_test.cpp | 813 +++++++++++++++++++++
7 files changed, 2356 insertions(+)
diff --git a/src/paimon/format/parquet/parquet_stats_extractor.cpp
b/src/paimon/format/parquet/parquet_stats_extractor.cpp
new file mode 100644
index 0000000..8f8b82a
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor.cpp
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+
+#include <cassert>
+#include <optional>
+#include <string_view>
+#include <unordered_map>
+
+#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/parquet/parquet_schema_util.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+
+namespace paimon {
+class MemoryPool;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+namespace {
+
+template <typename ParquetTypedStatsType, typename R = typename
ParquetTypedStatsType::T>
+std::pair<std::optional<R>, std::optional<R>> CollectMinMaxStats(
+ const std::shared_ptr<ParquetTypedStatsType>& typed_stats) {
+ std::optional<R> min;
+ std::optional<R> max;
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min = typed_stats->min();
+ max = typed_stats->max();
+ }
+ return std::make_pair(min, max);
+}
+
+Result<std::unique_ptr<ColumnStats>> ConvertStatsToColumnStats(
+ const std::shared_ptr<::parquet::Statistics>& stats,
+ const std::shared_ptr<::parquet::schema::PrimitiveNode>& primitive_node,
+ const std::shared_ptr<arrow::DataType>& write_type, const
std::shared_ptr<MemoryPool>& pool) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::DataType>
data_type,
+ GetArrowType(*primitive_node));
+ auto id = data_type->id();
+
+ std::optional<int64_t> null_count;
+ if (stats && stats->HasNullCount()) {
+ null_count = stats->null_count();
+ }
+ switch (id) {
+ case arrow::Type::BOOL: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::BoolStatistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ return ColumnStats::CreateBooleanColumnStats(min, max, null_count);
+ }
+ case arrow::Type::INT8: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+ std::optional<int8_t> min;
+ std::optional<int8_t> max;
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min = static_cast<int8_t>(typed_stats->min());
+ max = static_cast<int8_t>(typed_stats->max());
+ }
+ return ColumnStats::CreateTinyIntColumnStats(min, max, null_count);
+ }
+ case arrow::Type::INT16: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+ std::optional<int16_t> min;
+ std::optional<int16_t> max;
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min = static_cast<int16_t>(typed_stats->min());
+ max = static_cast<int16_t>(typed_stats->max());
+ }
+ return ColumnStats::CreateSmallIntColumnStats(min, max,
null_count);
+ }
+ case arrow::Type::INT32: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ return ColumnStats::CreateIntColumnStats(min, max, null_count);
+ }
+ case arrow::Type::INT64: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ return ColumnStats::CreateBigIntColumnStats(min, max, null_count);
+ }
+ case arrow::Type::FLOAT: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::FloatStatistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ return ColumnStats::CreateFloatColumnStats(min, max, null_count);
+ }
+ case arrow::Type::DOUBLE: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::DoubleStatistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ return ColumnStats::CreateDoubleColumnStats(min, max, null_count);
+ }
+ case arrow::Type::STRING: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::ByteArrayStatistics>(stats);
+ std::optional<std::string> min;
+ std::optional<std::string> max;
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min = std::string(std::string_view{typed_stats->min()});
+ max = std::string(std::string_view{typed_stats->max()});
+ }
+ return ColumnStats::CreateStringColumnStats(min, max, null_count);
+ }
+ case arrow::Type::BINARY: {
+ return ColumnStats::CreateStringColumnStats(std::nullopt,
std::nullopt, null_count);
+ }
+ case arrow::Type::DATE32: {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+ auto [min, max] =
CollectMinMaxStats<::parquet::Int32Statistics>(typed_stats);
+ return ColumnStats::CreateDateColumnStats(min, max, null_count);
+ }
+ case arrow::Type::TIMESTAMP: {
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<::arrow::TimestampType>(data_type);
+ if (timestamp_type->unit() == arrow::TimeUnit::type::NANO) {
+ // int96 does not have statistics
+ return ColumnStats::CreateTimestampColumnStats(
+ std::nullopt, std::nullopt, std::nullopt,
Timestamp::MAX_PRECISION);
+ }
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+ auto [min, max] = CollectMinMaxStats(typed_stats);
+ // while write type is ts(second), data type in parquet file will
be ts(milli), correct
+ // precision is supposed to be extracted from write type
+ auto write_ts_type =
+
arrow::internal::checked_pointer_cast<::arrow::TimestampType>(write_type);
+ int32_t precision =
DateTimeUtils::GetPrecisionFromType(write_ts_type);
+ if (!min || !max) {
+ return ColumnStats::CreateTimestampColumnStats(std::nullopt,
std::nullopt,
+ null_count,
precision);
+ }
+ auto src_time_type =
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+ auto [milli_min, nano_min] = DateTimeUtils::TimestampConverter(
+ min.value(), src_time_type,
DateTimeUtils::TimeType::MILLISECOND,
+ DateTimeUtils::TimeType::NANOSECOND);
+ auto [milli_max, nano_max] = DateTimeUtils::TimestampConverter(
+ max.value(), src_time_type,
DateTimeUtils::TimeType::MILLISECOND,
+ DateTimeUtils::TimeType::NANOSECOND);
+ return
ColumnStats::CreateTimestampColumnStats(Timestamp(milli_min, nano_min),
+
Timestamp(milli_max, nano_max),
+ null_count,
precision);
+ }
+ case arrow::Type::DECIMAL128: {
+ auto decimal_type =
+
arrow::internal::checked_pointer_cast<::arrow::Decimal128Type>(data_type);
+ int32_t precision = decimal_type->precision();
+ int32_t scale = decimal_type->scale();
+ std::optional<Decimal> min_value;
+ std::optional<Decimal> max_value;
+ if (primitive_node->physical_type() == ::parquet::Type::INT32) {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int32Statistics>(stats);
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min_value = Decimal(precision, scale, typed_stats->min());
+ max_value = Decimal(precision, scale, typed_stats->max());
+ }
+ } else if (primitive_node->physical_type() ==
::parquet::Type::INT64) {
+ auto typed_stats =
+
arrow::internal::checked_pointer_cast<::parquet::Int64Statistics>(stats);
+ if (typed_stats && typed_stats->HasMinMax()) {
+ min_value = Decimal(precision, scale, typed_stats->min());
+ max_value = Decimal(precision, scale, typed_stats->max());
+ }
+ } else if (primitive_node->physical_type() ==
::parquet::Type::FIXED_LEN_BYTE_ARRAY ||
+ primitive_node->physical_type() ==
::parquet::Type::BYTE_ARRAY) {
+ if (stats && stats->HasMinMax()) {
+ Bytes encode_min(stats->EncodeMin(), pool.get());
+ min_value = Decimal::FromUnscaledBytes(precision, scale,
&encode_min);
+ Bytes encode_max(stats->EncodeMax(), pool.get());
+ max_value = Decimal::FromUnscaledBytes(precision, scale,
&encode_max);
+ }
+ }
+ return ColumnStats::CreateDecimalColumnStats(min_value, max_value,
null_count,
+ precision, scale);
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("cannot fetch statistics, invalid type {}",
data_type->ToString()));
+ }
+}
+
+template <typename T>
+void MergeTypedStats(
+ const std::string& column_name, const
std::shared_ptr<::parquet::Statistics>& stats,
+ std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>>*
merged_stats) {
+ auto& entry = (*merged_stats)[column_name];
+ if (!entry) {
+ entry = stats;
+ } else {
+ arrow::internal::checked_pointer_cast<T>(entry)->Merge(
+ *arrow::internal::checked_pointer_cast<T>(stats));
+ }
+}
+
+Status MergeStats(
+ const std::string& column_name, const
std::shared_ptr<::parquet::Statistics>& stats,
+ std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>>*
merged_stats) {
+ switch (stats->physical_type()) {
+ case ::parquet::Type::BOOLEAN:
+ MergeTypedStats<::parquet::BoolStatistics>(column_name, stats,
merged_stats);
+ break;
+ case ::parquet::Type::INT32:
+ MergeTypedStats<::parquet::Int32Statistics>(column_name, stats,
merged_stats);
+ break;
+ case ::parquet::Type::INT64:
+ MergeTypedStats<::parquet::Int64Statistics>(column_name, stats,
merged_stats);
+ break;
+ case ::parquet::Type::FLOAT:
+ MergeTypedStats<::parquet::FloatStatistics>(column_name, stats,
merged_stats);
+ break;
+ case ::parquet::Type::DOUBLE:
+ MergeTypedStats<::parquet::DoubleStatistics>(column_name, stats,
merged_stats);
+ break;
+ case ::parquet::Type::BYTE_ARRAY:
+ MergeTypedStats<::parquet::ByteArrayStatistics>(column_name,
stats, merged_stats);
+ break;
+ case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
+ MergeTypedStats<::parquet::FLBAStatistics>(column_name, stats,
merged_stats);
+ break;
+ default:
+ return Status::Invalid(fmt::format("Unsupported parquet type {}
for statistics merge",
+
::parquet::TypeToString(stats->physical_type())));
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+Result<std::pair<ColumnStatsVector, FormatStatsExtractor::FileInfo>>
+ParquetStatsExtractor::ExtractWithFileInfo(const std::shared_ptr<FileSystem>&
file_system,
+ const std::string& path,
+ const std::shared_ptr<MemoryPool>&
pool) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
file_system->Open(path));
+ assert(input_stream);
+ PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length());
+ std::shared_ptr<arrow::MemoryPool> parquet_memory_pool =
GetArrowPool(pool);
+ auto parquet_input_file = std::make_shared<ArrowInputStreamAdapter>(
+ std::move(input_stream), parquet_memory_pool, file_length);
+ ::parquet::ReaderProperties read_properties(parquet_memory_pool.get());
+ read_properties.enable_buffered_stream();
+ ::parquet::arrow::FileReaderBuilder file_reader_builder;
+
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(parquet_input_file,
read_properties));
+
+ std::shared_ptr<::parquet::FileMetaData> file_metadata =
+ file_reader_builder.raw_reader()->metadata();
+ int32_t field_count = file_metadata->schema()->group_node()->field_count();
+
+ ColumnStatsVector result_stats;
+ result_stats.reserve(field_count);
+
+ std::unordered_map<std::string, std::shared_ptr<::parquet::Statistics>>
merged_stats;
+
+ for (int32_t row_group_idx = 0; row_group_idx <
file_metadata->num_row_groups();
+ ++row_group_idx) {
+ for (int32_t col_idx = 0; col_idx < file_metadata->num_columns();
++col_idx) {
+ auto column_chunk =
file_metadata->RowGroup(row_group_idx)->ColumnChunk(col_idx);
+ if (!column_chunk->is_stats_set()) {
+ continue;
+ }
+ auto stats = column_chunk->statistics();
+ std::string column_name =
column_chunk->path_in_schema()->ToDotString();
+ PAIMON_RETURN_NOT_OK(MergeStats(column_name, stats,
&merged_stats));
+ }
+ }
+
+ for (int32_t field_idx = 0; field_idx < field_count; ++field_idx) {
+ auto node = file_metadata->schema()->group_node()->field(field_idx);
+ if (node->is_group()) {
+ // nested type do not have parquet stats
+ const auto& logical_type = node->logical_type();
+ FieldType nested_type = FieldType::UNKNOWN;
+ if (logical_type->is_list()) {
+ nested_type = FieldType::ARRAY;
+ } else if (logical_type->is_map()) {
+ nested_type = FieldType::MAP;
+ } else if (logical_type->is_none()) {
+ nested_type = FieldType::STRUCT;
+ }
+
result_stats.push_back(ColumnStats::CreateNestedColumnStats(nested_type,
std::nullopt));
+ } else {
+ auto primitive_node =
+
arrow::internal::checked_pointer_cast<::parquet::schema::PrimitiveNode>(node);
+ assert(primitive_node != nullptr);
+ auto iter = merged_stats.find(node->name());
+ const std::shared_ptr<::parquet::Statistics>& parquet_stats =
+ iter == merged_stats.end() ? nullptr : iter->second;
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<ColumnStats> col_stats,
+ ConvertStatsToColumnStats(parquet_stats, primitive_node,
+
write_schema_->field(field_idx)->type(), pool));
+ result_stats.push_back(col_stats);
+ }
+ }
+ return std::make_pair(std::move(result_stats),
FileInfo(file_metadata->num_rows()));
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_stats_extractor.h
b/src/paimon/format/parquet/parquet_stats_extractor.h
new file mode 100644
index 0000000..0fc852b
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/format_stats_extractor.h"
+#include "paimon/format/parquet/parquet_schema_util.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "parquet/metadata.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+class FileSystem;
+class MemoryPool;
+} // namespace paimon
+
+namespace parquet::schema {
+class GroupNode;
+class Node;
+class PrimitiveNode;
+} // namespace parquet::schema
+
+namespace paimon::parquet {
+
+class ParquetStatsExtractor : public FormatStatsExtractor {
+ public:
+ explicit ParquetStatsExtractor(const std::shared_ptr<arrow::Schema>&
write_schema)
+ : write_schema_(write_schema) {}
+
+ Result<ColumnStatsVector> Extract(const std::shared_ptr<FileSystem>&
file_system,
+ const std::string& path,
+ const std::shared_ptr<MemoryPool>& pool)
override {
+ PAIMON_ASSIGN_OR_RAISE(auto result, ExtractWithFileInfo(file_system,
path, pool));
+ return result.first;
+ }
+
+ Result<std::pair<ColumnStatsVector, FileInfo>> ExtractWithFileInfo(
+ const std::shared_ptr<FileSystem>& file_system, const std::string&
path,
+ const std::shared_ptr<MemoryPool>& pool) override;
+
+ private:
+ void PrintConvertedType(const ::parquet::schema::Node* node);
+
+ void Visit(const ::parquet::schema::Node* node);
+ void Visit(const ::parquet::schema::PrimitiveNode* node);
+ void Visit(const ::parquet::schema::GroupNode* node);
+
+ private:
+ int64_t row_count_ = -1;
+ std::shared_ptr<arrow::Schema> write_schema_;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_stats_extractor_test.cpp
b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp
new file mode 100644
index 0000000..4dc7fbe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+
+#include <cstddef>
+#include <map>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/compare.h"
+#include "arrow/io/file.h"
+#include "arrow/ipc/api.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/core/stats/simple_stats_converter.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace paimon::parquet::test {
+
+class ParquetStatsExtractorTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ }
+ void TearDown() override {}
+
+ void CheckStats(const arrow::FieldVector& fields, const std::string& input,
+ const std::vector<std::string>& expected_stats, int64_t
expect_row_count) {
+ auto arrow_schema = arrow::schema(fields);
+ auto struct_type = arrow::struct_(fields);
+ std::map<std::string, std::string> options;
+ std::shared_ptr<arrow::MemoryPool> pool =
GetArrowPool(GetDefaultPool());
+ std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+ std::string file_name;
+ ASSERT_TRUE(UUID::Generate(&file_name));
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.enable_store_decimal_as_integer();
+ ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create(
+ out, arrow_schema,
builder.build(),
+
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool));
+ auto array = arrow::ipc::internal::json::ArrayFromJSON(struct_type,
input).ValueOrDie();
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+ ASSERT_OK(format_writer->AddBatch(arrow_array.get()));
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ auto write_schema = std::make_shared<arrow::Schema>(fields);
+ ParquetStatsExtractor stats_extractor(write_schema);
+ ASSERT_OK_AND_ASSIGN(auto result,
+ stats_extractor.ExtractWithFileInfo(fs,
file_path, GetDefaultPool()));
+ auto& col_stats_vec = result.first;
+ ASSERT_EQ(fields.size(), col_stats_vec.size());
+ ASSERT_EQ(col_stats_vec.size(), expected_stats.size());
+ for (size_t i = 0; i < expected_stats.size(); i++) {
+ ASSERT_EQ(expected_stats[i], col_stats_vec[i]->ToString());
+ }
+ auto row_count = result.second.GetRowCount();
+ ASSERT_EQ(row_count, expect_row_count);
+ }
+
+ private:
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+};
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStats) {
+ arrow::FieldVector fields = {
+ arrow::field("col0", arrow::struct_({arrow::field("col2",
arrow::boolean()),
+ arrow::field("col3",
arrow::int64())})),
+ arrow::field("col1", arrow::utf8()),
+ arrow::field("col2", arrow::int32()),
+ arrow::field("col3", arrow::boolean()),
+ arrow::field("col4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("col5", arrow::decimal128(23, 2)),
+ arrow::field("col6", arrow::float32()),
+ arrow::field("col7", arrow::float64()),
+ };
+ {
+ std::string data_str = R"([
+ [[true, 0], "str0", 100, true, "3970-01-01 00:00:00.000", "0.22",
1.1, 12.2],
+ [[false, 1], "str1", 101, false, "3970-01-01 00:02:03.999",
"0.28", 2.2, 12.2],
+ [[false, 2], "str2", 102, true, "3455-01-01 00:02:03.000",
"1234567890123456.00", 3.3, 13.2]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count null",
+ "min str0, max str2, null count 0",
+ "min 100, max 102, null count 0",
+ "min false, max true, null count 0",
+ "min null, max null, null count null",
+ "min 0.22, max 1234567890123456.00, null count 0",
+ "min 1.1, max 3.3, null count 0",
+ "min 12.2, max 13.2, null count 0",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/3);
+ }
+ {
+ std::string data_str = R"([
+ [[true, 0], "str0", 100, true, "3970-01-01 00:00:00.000", "0.22",
1.1, 12.2],
+ [[false, 1], "str1", 101, true, "3970-01-01 00:02:03.999", "0.28",
2.2, 12.2],
+ [[false, 2], "str2", 102, true, "3455-01-01 00:02:03.000",
"1234567890123456.00", 3.3, 13.2]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count null",
+ "min str0, max str2, null count 0",
+ "min 100, max 102, null count 0",
+ "min true, max true, null count 0",
+ "min null, max null, null count null",
+ "min 0.22, max 1234567890123456.00, null count 0",
+ "min 1.1, max 3.3, null count 0",
+ "min 12.2, max 13.2, null count 0",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/3);
+ }
+
+ {
+ std::string data_str = R"([
+ [[true, 0], "str0", 100, true, "1970-01-01 00:00:00.000", "0.22",
1.1, 12.2],
+ [[false, 1], "str1", 101, false, "1970-01-01 00:02:03.999",
"0.28", 2.2, 12.2],
+ [[false, 2], "str2", 102, true, "1985-01-01 00:02:03.000", "1.00",
3.3, 13.2]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count null", "min str0, max str2, null
count 0",
+ "min 100, max 102, null count 0", "min false, max true, null
count 0",
+ "min null, max null, null count null", "min 0.22, max 1.00, null
count 0",
+ "min 1.1, max 3.3, null count 0", "min 12.2, max 13.2, null
count 0",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/3);
+ }
+ {
+ std::string data_str = R"([
+ [[true, 0], null, 100, null, "3970-01-01 00:00:00.000", null,
null, null],
+ [[false, 1], "str1", 101, null, "3970-01-01 00:02:03.999", null,
null, 2.2],
+ [null, "str2", null, true, "3455-01-01 00:02:03.000", null, 1.1,
3.3]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count null", "min str1, max str2, null
count 1",
+ "min 100, max 101, null count 1", "min true, max true, null
count 2",
+ "min null, max null, null count null", "min null, max null, null
count 3",
+ "min 1.1, max 1.1, null count 2", "min 2.2, max 3.3, null
count 1",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/3);
+ }
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsSimpleType) {
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::boolean()), arrow::field("f1",
arrow::int8()),
+ arrow::field("f2", arrow::int16()), arrow::field("f3",
arrow::int32()),
+ arrow::field("field_null", arrow::int32()), arrow::field("f4",
arrow::int64()),
+ arrow::field("f5", arrow::float32()), arrow::field("f6",
arrow::float64()),
+ arrow::field("f7", arrow::utf8()), arrow::field("f8",
arrow::binary())};
+
+ std::string data_str = R"([
+ [true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659,
"20250327", "banana"],
+ [false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658,
"20250327", "dog"],
+ [null, 1, 32767, 2147483647, null, null, 2.1, 3.141592657, null,
"lucy"],
+ [true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657,
"20250326", null]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min false, max true, null count 1",
+ "min -2, max 1, null count 0",
+ "min -32768, max 32767, null count 0",
+ "min -2147483648, max 2147483647, null count 1",
+ "min null, max null, null count 4",
+ "min -4294967298, max 4294967296, null count 1",
+ "min 0.5, max 2.1, null count 0",
+ "min 1.141592659, max 3.141592657, null count 0",
+ "min 20250326, max 20250327, null count 1",
+ "min null, max null, null count 1",
+ };
+ CheckStats(fields, data_str, expected_stats_str, /*expect_row_count=*/4);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsComplexType) {
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())),
+ arrow::field("f2", arrow::list(arrow::float32())),
+ arrow::field("f3", arrow::struct_({arrow::field("f0",
arrow::boolean()),
+ arrow::field("f1",
arrow::int64())})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f5", arrow::date32()),
+ arrow::field("f6", arrow::decimal128(2, 2))};
+
+ std::string data_str = R"([
+ [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456,
"0.22"],
+ [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999999", 24,
"0.28"],
+ [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123123",
2456, "0.22"],
+ [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222],
"1970-01-01 00:02:03.123123", 245, "0.12"],
+ [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0",
24, "0.78"],
+ [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01
00:00:00.123123", 24, "0.78"]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count null", "min null, max null, null count
null",
+ "min null, max null, null count null", "min null, max null, null count
null",
+ "min 24, max 2456, null count 0", "min 0.12, max 0.78, null count
0",
+ };
+ CheckStats(fields, data_str, expected_stats_str, /*expect_row_count=*/6);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestNullForAllType) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::boolean()),
+ arrow::field("f1", arrow::int8()),
+ arrow::field("f2", arrow::int16()),
+ arrow::field("f3", arrow::int32()),
+ arrow::field("f4", arrow::int64()),
+ arrow::field("f5", arrow::float32()),
+ arrow::field("f6", arrow::float64()),
+ arrow::field("f7", arrow::utf8()),
+ arrow::field("f8", arrow::binary()),
+ arrow::field("f9", arrow::map(arrow::int8(), arrow::int16())),
+ arrow::field("f10", arrow::list(arrow::float32())),
+ arrow::field("f11", arrow::struct_({arrow::field("f0",
arrow::boolean()),
+ arrow::field("f1",
arrow::int64())})),
+ arrow::field("f12", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f13", arrow::date32()),
+ arrow::field("f14", arrow::decimal128(2, 2)),
+ arrow::field("f15", arrow::decimal128(30, 2)),
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND,
timezone)),
+ arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI,
timezone)),
+ arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO,
timezone)),
+ arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO,
timezone)),
+ };
+ auto schema = std::make_shared<arrow::Schema>(fields);
+ std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+ std::string file_name = dir_->Str() + "/test.parquet";
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs->Create(file_name, /*overwrite=*/false));
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::MemoryPool> arrow_pool = GetArrowPool(pool);
+ ::parquet::WriterProperties::Builder builder;
+ builder.enable_store_decimal_as_integer();
+ ASSERT_OK_AND_ASSIGN(
+ auto format_writer,
+ ParquetFormatWriter::Create(out, schema, builder.build(),
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool));
+ auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+
[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]
+ ])")
+ .ValueOrDie());
+ ArrowArray c_array;
+ ASSERT_TRUE(arrow::ExportArray(*src_array, &c_array).ok());
+ ASSERT_OK(format_writer->AddBatch(&c_array));
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+
+ auto extractor = std::make_shared<ParquetStatsExtractor>(schema);
+ ASSERT_OK_AND_ASSIGN(auto ret, extractor->ExtractWithFileInfo(fs,
file_name, pool));
+
+ auto column_stats = ret.first;
+ auto file_info = ret.second;
+ ASSERT_EQ(src_array->length(), file_info.GetRowCount());
+ ASSERT_OK_AND_ASSIGN(auto stats,
SimpleStatsConverter::ToBinary(column_stats, pool.get()));
+ // test compatible with java
+ ASSERT_EQ(stats.min_values_.HashCode(), 0xf890741a);
+ ASSERT_EQ(stats.max_values_.HashCode(), 0xf890741a);
+}
+
+TEST_F(ParquetStatsExtractorTest, TestExtractStatsTimestampType) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND,
timezone)),
+ arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI,
timezone)),
+ arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO,
timezone)),
+ arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO,
timezone)),
+ };
+
+ {
+ std::string data_str = R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02",
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null,
"1970-01-01 00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01
00:00:00.004", "1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null,
null, "1970-01-01 00:00:06",
null, "1970-01-01 00:00:00.000006", null]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min 1970-01-01 00:00:01.000000000, max 1970-01-01
00:00:05.000000000, null count 0",
+ "min 1970-01-01 00:00:00.001000000, max 1970-01-01
00:00:00.005000000, null count 0",
+ "min 1970-01-01 00:00:00.000001000, max 1970-01-01
00:00:00.000001000, null count 2",
+ "min null, max null, null count null",
+ "min 1970-01-01 00:00:02.000000000, max 1970-01-01
00:00:06.000000000, null count 0",
+ "min 1970-01-01 00:00:00.002000000, max 1970-01-01
00:00:00.004000000, null count 1",
+ "min 1970-01-01 00:00:00.000002000, max 1970-01-01
00:00:00.000006000, null count 0",
+ "min null, max null, null count null",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/3);
+ }
+ {
+ std::string data_str = R"([
+ [null,null,null,null,null,null,null,null]
+ ])";
+ std::vector<std::string> expected_stats_str = {
+ "min null, max null, null count 1", "min null, max null, null
count 1",
+ "min null, max null, null count 1", "min null, max null, null
count null",
+ "min null, max null, null count 1", "min null, max null, null
count 1",
+ "min null, max null, null count 1", "min null, max null, null
count null",
+ };
+ CheckStats(fields, data_str, expected_stats_str,
/*expect_row_count=*/1);
+ }
+}
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/predicate_converter.cpp
b/src/paimon/format/parquet/predicate_converter.cpp
new file mode 100644
index 0000000..819d2cc
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter.cpp
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/expression.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon::parquet {
+arrow::compute::Expression PredicateConverter::AlwaysTrue() {
+ static const arrow::compute::Expression expr =
arrow::compute::literal(true);
+ return expr;
+}
+
+Result<arrow::compute::Expression> PredicateConverter::Convert(
+ const std::shared_ptr<Predicate>& predicate, uint32_t node_count_limit) {
+ if (!predicate) {
+ return AlwaysTrue();
+ }
+ uint32_t node_count = 0;
+ CollectNodeCount(predicate, &node_count);
+ if (node_count > node_count_limit) {
+ return AlwaysTrue();
+ }
+ return InnerConvert(predicate);
+}
+
+void PredicateConverter::CollectNodeCount(const std::shared_ptr<Predicate>&
predicate,
+ uint32_t* node_count) {
+ const auto& function_type = predicate->GetFunction().GetType();
+ if (auto leaf_predicate =
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+ if (function_type == Function::Type::IN || function_type ==
Function::Type::NOT_IN) {
+ // IN and NOT_IN will be converted to Or(Equals) and And(NotEqual)
+ *node_count += leaf_predicate->Literals().size();
+ }
+ *node_count += 1;
+ return;
+ }
+ if (auto compound_predicate =
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+ *node_count += 1;
+ for (const auto& child : compound_predicate->Children()) {
+ CollectNodeCount(child, node_count);
+ }
+ }
+}
+
+Result<arrow::compute::Expression> PredicateConverter::InnerConvert(
+ const std::shared_ptr<Predicate>& predicate) {
+ if (!predicate) {
+ return AlwaysTrue();
+ }
+ if (auto leaf_predicate =
std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
+ return ConvertLeaf(leaf_predicate);
+ }
+ if (auto compound_predicate =
std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
+ return ConvertCompound(compound_predicate);
+ }
+ return Status::Invalid("invalid predicate, must be leaf or compound");
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertCompound(
+ const std::shared_ptr<CompoundPredicate>& compound_predicate) {
+ const auto& children = compound_predicate->Children();
+ const auto& function = compound_predicate->GetFunction();
+ auto function_type = function.GetType();
+ switch (function_type) {
+ case Function::Type::AND: {
+ std::vector<arrow::compute::Expression> sub_exprs;
+ sub_exprs.reserve(children.size());
+ for (const auto& child : children) {
+ PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr,
InnerConvert(child));
+ sub_exprs.push_back(std::move(sub_expr));
+ }
+ return arrow::compute::and_(sub_exprs);
+ }
+ case Function::Type::OR: {
+ std::vector<arrow::compute::Expression> sub_exprs;
+ sub_exprs.reserve(children.size());
+ for (const auto& child : children) {
+ PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression sub_expr,
InnerConvert(child));
+ sub_exprs.push_back(std::move(sub_expr));
+ }
+ return arrow::compute::or_(sub_exprs);
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("invalid predicate type {}",
static_cast<int32_t>(function_type)));
+ }
+}
+
+Status PredicateConverter::CheckLiteralNotEmpty(const std::vector<Literal>&
literals,
+ const Function& function,
+ const std::string& field_name)
{
+ if (literals.empty()) {
+ return Status::Invalid(fmt::format("predicate [{}] need literal on
field {}",
+ function.ToString(), field_name));
+ }
+ return Status::OK();
+}
+
+#define CONVERT_TO_ARROW_LITERAL(LITERAL)
\
+ auto arrow_literal_result = ConvertToArrowLiteral(LITERAL);
\
+ if (!arrow_literal_result.ok() &&
arrow_literal_result.status().IsNotImplemented()) { \
+ return AlwaysTrue();
\
+ }
\
+ if (!arrow_literal_result.ok()) {
\
+ return arrow_literal_result.status();
\
+ }
\
+ auto arrow_literal = std::move(arrow_literal_result).value();
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertLeaf(
+ const std::shared_ptr<LeafPredicate>& leaf_predicate) {
+ const auto& field_name = leaf_predicate->FieldName();
+ const auto& literals = leaf_predicate->Literals();
+ const auto& function = leaf_predicate->GetFunction();
+ auto function_type = function.GetType();
+ switch (function_type) {
+ case Function::Type::IS_NULL: {
+ return
arrow::compute::is_null(arrow::compute::field_ref(field_name),
+ /*nan_is_null=*/false);
+ }
+ case Function::Type::IS_NOT_NULL: {
+ return arrow::compute::not_(
+ arrow::compute::is_null(arrow::compute::field_ref(field_name),
+ /*nan_is_null=*/false));
+ }
+ case Function::Type::EQUAL: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal);
+ }
+ case Function::Type::NOT_EQUAL: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return
arrow::compute::not_equal(arrow::compute::field_ref(field_name), arrow_literal);
+ }
+ case Function::Type::GREATER_THAN: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return
arrow::compute::greater(arrow::compute::field_ref(field_name), arrow_literal);
+ }
+ case Function::Type::GREATER_OR_EQUAL: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return
arrow::compute::greater_equal(arrow::compute::field_ref(field_name),
+ arrow_literal);
+ }
+ case Function::Type::LESS_THAN: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return arrow::compute::less(arrow::compute::field_ref(field_name),
arrow_literal);
+ }
+ case Function::Type::LESS_OR_EQUAL: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ CONVERT_TO_ARROW_LITERAL(literals[0]);
+ return
arrow::compute::less_equal(arrow::compute::field_ref(field_name),
arrow_literal);
+ }
+ // Noted that: java paimon don't support pushdown IN and NOT_IN to
parquet
+ case Function::Type::IN: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ // in convert to Or(Equals)
+ std::vector<arrow::compute::Expression> sub_exprs;
+ sub_exprs.reserve(literals.size());
+ for (const auto& literal : literals) {
+ CONVERT_TO_ARROW_LITERAL(literal);
+ sub_exprs.push_back(
+
arrow::compute::equal(arrow::compute::field_ref(field_name), arrow_literal));
+ }
+ return arrow::compute::or_(sub_exprs);
+ }
+ case Function::Type::NOT_IN: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ // not in convert to And(NotEqual)
+ std::vector<arrow::compute::Expression> sub_exprs;
+ sub_exprs.reserve(literals.size());
+ for (const auto& literal : literals) {
+ CONVERT_TO_ARROW_LITERAL(literal);
+
sub_exprs.push_back(arrow::compute::not_equal(arrow::compute::field_ref(field_name),
+ arrow_literal));
+ }
+ return arrow::compute::and_(sub_exprs);
+ }
+ case Function::Type::STARTS_WITH: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ auto options =
std::make_shared<arrow::compute::MatchSubstringOptions>(
+ literals[0].GetValue<std::string>());
+ return arrow::compute::call("starts_with",
{arrow::compute::field_ref(field_name)},
+ options);
+ }
+ case Function::Type::ENDS_WITH: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ auto options =
std::make_shared<arrow::compute::MatchSubstringOptions>(
+ literals[0].GetValue<std::string>());
+ return arrow::compute::call("ends_with",
{arrow::compute::field_ref(field_name)},
+ options);
+ }
+ case Function::Type::CONTAINS: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ auto options =
std::make_shared<arrow::compute::MatchSubstringOptions>(
+ literals[0].GetValue<std::string>());
+ return arrow::compute::call("match_substring",
{arrow::compute::field_ref(field_name)},
+ options);
+ }
+ case Function::Type::LIKE: {
+ PAIMON_RETURN_NOT_OK(CheckLiteralNotEmpty(literals, function,
field_name));
+ auto options =
std::make_shared<arrow::compute::MatchSubstringOptions>(
+ literals[0].GetValue<std::string>());
+ return arrow::compute::call("match_like",
{arrow::compute::field_ref(field_name)},
+ options);
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("invalid predicate type {}",
static_cast<int32_t>(function_type)));
+ }
+ return Status::OK();
+}
+
+Result<arrow::compute::Expression> PredicateConverter::ConvertToArrowLiteral(
+ const Literal& literal) {
+ auto literal_type = literal.GetType();
+ if (literal.IsNull()) {
+ return Status::Invalid("literal cannot be null in predicate");
+ }
+ switch (literal_type) {
+ case FieldType::BOOLEAN:
+ return
arrow::compute::literal(std::make_shared<arrow::BooleanScalar>(
+ static_cast<bool>(literal.GetValue<bool>())));
+ case FieldType::TINYINT:
+ return arrow::compute::literal(std::make_shared<arrow::Int8Scalar>(
+ static_cast<int8_t>(literal.GetValue<int8_t>())));
+ case FieldType::SMALLINT:
+ return
arrow::compute::literal(std::make_shared<arrow::Int16Scalar>(
+ static_cast<int16_t>(literal.GetValue<int16_t>())));
+ case FieldType::INT:
+ return
arrow::compute::literal(std::make_shared<arrow::Int32Scalar>(
+ static_cast<int32_t>(literal.GetValue<int32_t>())));
+ case FieldType::DATE:
+ return
arrow::compute::literal(std::make_shared<arrow::Date32Scalar>(
+ static_cast<int32_t>(literal.GetValue<int32_t>())));
+ case FieldType::BIGINT:
+ return
arrow::compute::literal(std::make_shared<arrow::Int64Scalar>(
+ static_cast<int64_t>(literal.GetValue<int64_t>())));
+ case FieldType::FLOAT:
+ return
arrow::compute::literal(std::make_shared<arrow::FloatScalar>(
+ static_cast<float>(literal.GetValue<float>())));
+ case FieldType::DOUBLE:
+ return
arrow::compute::literal(std::make_shared<arrow::DoubleScalar>(
+ static_cast<double>(literal.GetValue<double>())));
+ case FieldType::STRING: {
+ auto str = literal.GetValue<std::string>();
+ return
arrow::compute::literal(std::make_shared<arrow::StringScalar>(str));
+ }
+ case FieldType::DECIMAL: {
+ auto decimal = literal.GetValue<Decimal>();
+ return
arrow::compute::literal(std::make_shared<arrow::Decimal128Scalar>(
+ arrow::Decimal128(decimal.HighBits(), decimal.LowBits()),
+ arrow::decimal128(decimal.Precision(), decimal.Scale())));
+ }
+ // TODO(lisizhuo.lsz): java paimon does not support BINARY, TIMESTAMP
and DECIMAL
+ case FieldType::TIMESTAMP:
+ case FieldType::BINARY:
+ return Status::NotImplemented(
+ "Not support Binary and Timestamp predicate push down in
parquet file "
+ "format");
+ default:
+ return Status::Invalid(
+ fmt::format("invalid literal type {}",
static_cast<int32_t>(literal_type)));
+ }
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/predicate_converter.h
b/src/paimon/format/parquet/predicate_converter.h
new file mode 100644
index 0000000..5b44f84
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/expression.h"
+#include "paimon/predicate/compound_predicate.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class CompoundPredicate;
+class Function;
+class LeafPredicate;
+class Literal;
+class Predicate;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+class PredicateConverter {
+ public:
+ PredicateConverter() = delete;
+ ~PredicateConverter() = delete;
+
+ // convert paimon predicate to arrow expression, if total node count of
predicate exceed
+ // predicate_node_count_limit, will return AlwaysTrue
+ static Result<arrow::compute::Expression> Convert(const
std::shared_ptr<Predicate>& predicate,
+ uint32_t
predicate_node_count_limit);
+
+ static arrow::compute::Expression AlwaysTrue();
+
+ private:
+ static Result<arrow::compute::Expression> InnerConvert(
+ const std::shared_ptr<Predicate>& predicate);
+
+ static void CollectNodeCount(const std::shared_ptr<Predicate>& predicate,
uint32_t* node_count);
+
+ static Result<arrow::compute::Expression> ConvertCompound(
+ const std::shared_ptr<CompoundPredicate>& compound_predicate);
+
+ static Status CheckLiteralNotEmpty(const std::vector<Literal>& literals,
+ const Function& function, const
std::string& field_name);
+
+ static Result<arrow::compute::Expression> ConvertLeaf(
+ const std::shared_ptr<LeafPredicate>& leaf_predicate);
+
+ static Result<arrow::compute::Expression> ConvertToArrowLiteral(const
Literal& literal);
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/predicate_converter_test.cpp
b/src/paimon/format/parquet/predicate_converter_test.cpp
new file mode 100644
index 0000000..a6661dc
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_converter_test.cpp
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/predicate_converter.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "arrow/compute/expression.h"
+#include "gtest/gtest.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+TEST(PredicateConverterTest, TestSimple) {
+ //
"struct<f0:bigint,f1:double,f2:string,f3:int,f4:tinyint,f5:decimal(6,2),f6:date,f7:timestamp>";
+ {
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f0",
FieldType::BIGINT);
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("is_null(f0, {nan_is_null=false})", expression.ToString());
+ }
+ {
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/0,
/*field_name=*/"f0", FieldType::BIGINT);
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("invert(is_null(f0, {nan_is_null=false}))",
expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 == 5)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::INT, Literal(10));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f3 == 10)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/6,
/*field_name=*/"f6",
+ FieldType::DATE,
Literal(FieldType::DATE, 10));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f6 == 1970-01-11)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 != 5)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 > 5)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 >= 5)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterOrEqual(
+ /*field_index=*/4, /*field_name=*/"f4", FieldType::TINYINT,
+ Literal(static_cast<int8_t>(16)));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f4 >= 16)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 < 5)", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(5l));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f0 <= 5)", expression.ToString());
+ }
+ {
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/0, /*field_name=*/"f0",
FieldType::BIGINT,
+ {Literal(1l), Literal(3l), Literal(5l)});
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(((f0 == 1) or (f0 == 3)) or (f0 == 5))",
expression.ToString());
+ }
+ {
+ auto predicate =
+ PredicateBuilder::NotIn(/*field_index=*/0, /*field_name=*/"f0",
FieldType::BIGINT,
+ {Literal(1l), Literal(3l), Literal(5l)});
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(((f0 != 1) and (f0 != 3)) and (f0 != 5))",
expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ const auto predicate,
+ PredicateBuilder::StartsWith(/*field_index=*/0,
/*field_name=*/"f0", FieldType::STRING,
+ Literal(FieldType::STRING, "aab",
3)));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("starts_with(f0, {pattern=\"aab\", ignore_case=false})",
expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ const auto predicate,
+ PredicateBuilder::EndsWith(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "bcc", 3)));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("ends_with(f0, {pattern=\"bcc\", ignore_case=false})",
expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ const auto predicate,
+ PredicateBuilder::Contains(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "abc", 3)));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("match_substring(f0, {pattern=\"abc\", ignore_case=false})",
+ expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ const auto predicate,
+ PredicateBuilder::Like(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "abc", 3)));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("match_like(f0, {pattern=\"abc\", ignore_case=false})",
expression.ToString());
+ }
+ {
+ // support decimal precision and scale mismatches between literal and
data
+ auto predicate = PredicateBuilder::In(/*field_index=*/7,
/*field_name=*/"f7",
+ FieldType::DECIMAL,
{Literal(Decimal(5, 1, 12345))});
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(f7 == 1234.5)", expression.ToString());
+ }
+ {
+ // do not support pushdown predicate with timestamp literal, will
always return true
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/6, /*field_name=*/"f6",
FieldType::TIMESTAMP,
+ {Literal(Timestamp(1000, 12345))});
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("true", expression.ToString());
+ }
+ {
+ auto predicate = PredicateBuilder::LessOrEqual(
+ /*field_index=*/0, /*field_name=*/"f0", FieldType::BIGINT,
Literal(FieldType::BIGINT));
+ ASSERT_NOK_WITH_MSG(
+ PredicateConverter::Convert(predicate,
/*predicate_node_count_limit=*/100),
+ "literal cannot be null in predicate");
+ }
+}
+
+TEST(PredicateConverterTest, TestCompound) {
+ //
"struct<f0:bigint,f1:float,f2:string,f3:boolean,f4:date,f5:timestamp,f6:decimal(6,2),f7:binary>";
+ {
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And({
+ PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f0", FieldType::BIGINT,
+ Literal(3l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(5.0))),
+ PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f2", FieldType::STRING,
+ Literal(FieldType::STRING, "apple",
5)),
+ PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3", FieldType::BOOLEAN,
+ Literal(true)),
+ PredicateBuilder::Equal(/*field_index=*/4,
/*field_name=*/"f4", FieldType::DATE,
+ Literal(FieldType::DATE, 3)),
+ PredicateBuilder::Equal(/*field_index=*/5, /*field_name=*/"f5",
+ FieldType::TIMESTAMP,
+ Literal(Timestamp(1725875365442l,
12000))),
+ PredicateBuilder::Equal(/*field_index=*/6,
/*field_name=*/"f6", FieldType::DECIMAL,
+ Literal(Decimal(6, 2, 123456))),
+ PredicateBuilder::Equal(/*field_index=*/7,
/*field_name=*/"f7", FieldType::BINARY,
+ Literal(FieldType::BINARY, "add", 3)),
+ }));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ(
+ "((((((((f0 == 3) and (f1 == 5)) and (f2 == \"apple\")) and (f3 ==
true)) and (f4 == "
+ "1970-01-04)) and true) and (f6 == 1234.56)) and true)",
+ expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or({
+ PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f0", FieldType::BIGINT,
+ Literal(3l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(5.0))),
+ PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f2", FieldType::STRING,
+ Literal(FieldType::STRING, "apple",
5)),
+ PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3", FieldType::BOOLEAN,
+ Literal(true)),
+ }));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("((((f0 == 3) or (f1 == 5)) or (f2 == \"apple\")) or (f3 ==
true))",
+ expression.ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::And(
+ {PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
Literal(true)),
+ PredicateBuilder::LessThan(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::BIGINT,
Literal(3l))})
+ .value(),
+ PredicateBuilder::And(
+ {PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
Literal(false)),
+ PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::FLOAT,
+
Literal(static_cast<float>(3.1)))})
+ .value()}));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(((f3 == true) and (f0 < 3)) or ((f3 == false) and (f1 <
3.1)))",
+ expression.ToString());
+ }
+ {
+ // predicate nodes containing binary type will not be pushed down
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::Or(
+ {PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
Literal(true)),
+ PredicateBuilder::LessThan(
+ /*field_index=*/7, /*field_name=*/"f7",
FieldType::BINARY,
+ Literal(FieldType::BINARY, "add", 3))})
+ .value(),
+ PredicateBuilder::Or(
+ {PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
Literal(false)),
+ PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::FLOAT,
+
Literal(static_cast<float>(3.1)))})
+ .value()}));
+ ASSERT_OK_AND_ASSIGN(auto expression, PredicateConverter::Convert(
+ predicate,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("(((f3 == true) or true) and ((f3 == false) or (f1 < 3.1)))",
+ expression.ToString());
+ }
+}
+
+TEST(PredicateConverterTest, TestCollectNodeCount) {
+ // And([And([Equal(f0, 10), NotEqual(f1, 20)]), Or([GreaterThan(v0, 30),
LessThan(f3, 20)]),
+ // Or([LessOrEqual(f2, 50), In(f0, [20, 60]), In(f0, [120, 160])]),
GreaterOrEqual(v1, 120)])
+ auto equal = PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f0", FieldType::INT,
+ Literal(10));
+ auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::INT, Literal(20));
+ // and_predicate has 3 nodes
+ ASSERT_OK_AND_ASSIGN(auto and_predicate, PredicateBuilder::And({equal,
not_equal}));
+ uint32_t node_count = 0;
+ PredicateConverter::CollectNodeCount(and_predicate, &node_count);
+ ASSERT_EQ(node_count, 3);
+
+ auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/2,
/*field_name=*/"v0",
+ FieldType::INT,
Literal(30));
+ auto less_than = PredicateBuilder::LessThan(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::INT, Literal(20));
+ // or_predicate has 3 nodes
+ ASSERT_OK_AND_ASSIGN(auto or_predicate,
PredicateBuilder::Or({greater_than, less_than}));
+ node_count = 0;
+ PredicateConverter::CollectNodeCount(or_predicate, &node_count);
+ ASSERT_EQ(node_count, 3);
+
+ auto less_or_equal = PredicateBuilder::LessOrEqual(/*field_index=*/4,
/*field_name=*/"f2",
+ FieldType::INT,
Literal(50));
+ auto in = PredicateBuilder::In(/*field_index=*/5, /*field_name=*/"f0",
FieldType::INT,
+ {Literal(20), Literal(60)});
+ auto not_in = PredicateBuilder::NotIn(/*field_index=*/5,
/*field_name=*/"f0", FieldType::INT,
+ {Literal(120), Literal(160)});
+ // or_predicate2 has 8 nodes
+ ASSERT_OK_AND_ASSIGN(auto or_predicate2,
PredicateBuilder::Or({less_or_equal, in, not_in}));
+ node_count = 0;
+ PredicateConverter::CollectNodeCount(or_predicate2, &node_count);
+ ASSERT_EQ(node_count, 8);
+
+ auto greater_or_equal =
PredicateBuilder::GreaterOrEqual(/*field_index=*/4, /*field_name=*/"v1",
+ FieldType::INT,
Literal(120));
+ node_count = 0;
+ PredicateConverter::CollectNodeCount(greater_or_equal, &node_count);
+ ASSERT_EQ(node_count, 1);
+
+ // predicate has (1+3+3+8+1) nodes
+ ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({and_predicate,
or_predicate,
+ or_predicate2,
greater_or_equal}));
+ node_count = 0;
+ PredicateConverter::CollectNodeCount(predicate, &node_count);
+ ASSERT_EQ(node_count, 16);
+}
+
+TEST(PredicateConverterTest, TestExceedNodeCountLimit) {
+ // And([And([Equal(f0, 10), NotEqual(f1, 20)]), Or([GreaterThan(v0, 30),
LessThan(f3, 20)]),
+ // Or([LessOrEqual(f2, 50), In(f0, [20, 60]), In(f0, [120, 160])]),
GreaterOrEqual(v1, 120)])
+ auto equal = PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f0", FieldType::INT,
+ Literal(10));
+ auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::INT, Literal(20));
+ // and_predicate has 3 nodes
+ ASSERT_OK_AND_ASSIGN(auto and_predicate, PredicateBuilder::And({equal,
not_equal}));
+
+ auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/2,
/*field_name=*/"v0",
+ FieldType::INT,
Literal(30));
+ auto less_than = PredicateBuilder::LessThan(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::INT, Literal(20));
+ // or_predicate has 3 nodes
+ ASSERT_OK_AND_ASSIGN(auto or_predicate,
PredicateBuilder::Or({greater_than, less_than}));
+
+ auto less_or_equal = PredicateBuilder::LessOrEqual(/*field_index=*/4,
/*field_name=*/"f2",
+ FieldType::INT,
Literal(50));
+ auto in = PredicateBuilder::In(/*field_index=*/5, /*field_name=*/"f0",
FieldType::INT,
+ {Literal(20), Literal(60)});
+ auto not_in = PredicateBuilder::NotIn(/*field_index=*/5,
/*field_name=*/"f0", FieldType::INT,
+ {Literal(120), Literal(160)});
+ // or_predicate2 has 8 nodes
+ ASSERT_OK_AND_ASSIGN(auto or_predicate2,
PredicateBuilder::Or({less_or_equal, in, not_in}));
+
+ auto greater_or_equal =
PredicateBuilder::GreaterOrEqual(/*field_index=*/4, /*field_name=*/"v1",
+ FieldType::INT,
Literal(120));
+
+ // predicate has (1+3+3+8+1) nodes
+ ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({and_predicate,
or_predicate,
+ or_predicate2,
greater_or_equal}));
+
+ ASSERT_OK_AND_ASSIGN(auto expression,
+ PredicateConverter::Convert(predicate,
/*predicate_node_count_limit=*/20));
+ ASSERT_EQ(expression.ToString(),
+ "(((((f0 == 10) and (f1 != 20)) and ((v0 > 30) or (f3 < 20)))
and (((f2 <= 50) or "
+ "((f0 == 20) or (f0 == 60))) or ((f0 != 120) and (f0 != 160))))
and (v1 >= 120))");
+ ASSERT_OK_AND_ASSIGN(auto expression_always_true,
+ PredicateConverter::Convert(predicate,
/*predicate_node_count_limit=*/10));
+ ASSERT_EQ(expression_always_true.ToString(), "true");
+}
+
+TEST(PredicateConverterTest, TestWithoutPredicate) {
+ ASSERT_OK_AND_ASSIGN(auto expression,
+ PredicateConverter::Convert(nullptr,
/*predicate_node_count_limit=*/100));
+ ASSERT_EQ("true", expression.ToString());
+}
+
+TEST(PredicateConverterTest, TestInvalidCase) {
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/0, /*field_name=*/"f0",
FieldType::INT, {});
+ ASSERT_NOK_WITH_MSG(PredicateConverter::Convert(predicate,
/*predicate_node_count_limit=*/100),
+ "predicate [In] need literal on field f0");
+}
+
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp
b/src/paimon/format/parquet/predicate_pushdown_test.cpp
new file mode 100644
index 0000000..67d7cd5
--- /dev/null
+++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp
@@ -0,0 +1,813 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::parquet::test {
+
+class PredicatePushdownTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ arrow_pool_ = GetArrowPool(pool_);
+ batch_size_ = 10;
+
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::utf8()), arrow::field("f1",
arrow::float32()),
+ arrow::field("f2", arrow::int64()), arrow::field("f3",
arrow::boolean()),
+ arrow::field("f4", arrow::int64()), arrow::field("f5",
arrow::binary())};
+
+ struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields),
R"([
+ ["apple", 4.0, 4, true, null, "add"], ["banana", 4.0, 6, true, null,
"bad"],
+ ["camera", 4.0, 8, true, null, "cat"], ["data", null, 10, true, null,
"dad"]
+ ])")
+ .ValueOrDie());
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ file_name_ = dir_->Str() + "/test.data";
+ fs_ = dir_->GetFileSystem();
+ }
+
+ void TearDown() override {}
+
+ void PrepareTestData(const std::shared_ptr<arrow::StructArray>&
struct_array) {
+ auto data_type = struct_array->struct_type();
+ auto data_schema = arrow::schema(data_type->fields());
+ auto data_arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*struct_array,
data_arrow_array.get()).ok());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_name_, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(batch_size_);
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ auto format_writer,
+ ParquetFormatWriter::Create(out, data_schema, writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+ ASSERT_OK(format_writer->AddBatch(data_arrow_array.get()));
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Close());
+ }
+
+ void CheckResult(const std::shared_ptr<arrow::Schema>& read_schema,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::shared_ptr<arrow::Array>& expected_array,
+ uint32_t predicate_node_count_limit =
+
paimon::parquet::DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT) {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_name_));
+ ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
+ auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in,
arrow_pool_, length);
+
+ std::map<std::string, std::string> options;
+ options[paimon::parquet::PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT] =
+ std::to_string(predicate_node_count_limit);
+ ASSERT_OK_AND_ASSIGN(auto batch_reader,
+
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_,
+ options,
batch_size_));
+ std::unique_ptr<ArrowSchema> c_schema =
std::make_unique<ArrowSchema>();
+ auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+ ASSERT_TRUE(arrow_status.ok());
+ ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate,
+
/*selection_bitmap=*/std::nullopt));
+ ASSERT_OK_AND_ASSIGN(auto arrow_array,
+
paimon::test::ReadResultCollector::CollectResult(batch_reader.get()));
+ if (expected_array) {
+ ASSERT_TRUE(arrow_array);
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_array);
+ ASSERT_TRUE(expected_chunk_array->Equals(arrow_array)) <<
arrow_array->ToString();
+ } else {
+ ASSERT_FALSE(arrow_array);
+ }
+ }
+
+ private:
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+ std::shared_ptr<MemoryPool> pool_;
+ int32_t batch_size_;
+ std::shared_ptr<arrow::StructArray> struct_array_;
+ std::shared_ptr<FileSystem> fs_;
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::string file_name_;
+};
+
+TEST_F(PredicatePushdownTest, TestIntDoubleData) {
+ PrepareTestData(struct_array_);
+ auto data_type = struct_array_->struct_type();
+ arrow::FieldVector fields = {data_type->GetFieldByName("f0"),
data_type->GetFieldByName("f1"),
+ data_type->GetFieldByName("f2"),
data_type->GetFieldByName("f3"),
+ data_type->GetFieldByName("f4")};
+ auto read_schema = arrow::schema(fields);
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make(
+ {struct_array_->GetFieldByName("f0"),
struct_array_->GetFieldByName("f1"),
+ struct_array_->GetFieldByName("f2"),
struct_array_->GetFieldByName("f3"),
+ struct_array_->GetFieldByName("f4")},
+ fields)
+ .ValueOrDie();
+ {
+ // f1 == 4, has data
+ auto predicate =
+ PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::FLOAT,
+ Literal(static_cast<float>(4.0)));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f1 == 6, no data
+ auto predicate =
+ PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::FLOAT,
+ Literal(static_cast<float>(6.0)));
+ CheckResult(read_schema, predicate, /*expected_array=*/
+ nullptr);
+ }
+ {
+ // f1 != 4, no data
+ auto predicate = PredicateBuilder::NotEqual(
+ /*field_index=*/1, /*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0)));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f2 != 4, has data
+ auto predicate = PredicateBuilder::NotEqual(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT,
Literal(4l));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 == 6, has data
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT,
Literal(6l));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 == 1, no data
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT,
Literal(1l));
+ CheckResult(read_schema, predicate, /*expected_array=*/
+ nullptr);
+ }
+ {
+ // f2 in [1,2,3], no data
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2",
FieldType::BIGINT,
+ {Literal(1l), Literal(2l), Literal(3l)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f2 in [1,2,3] but has small predicate node limit, has data
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2",
FieldType::BIGINT,
+ {Literal(1l), Literal(2l), Literal(3l)});
+ CheckResult(read_schema, predicate, expected_array,
+ /*predicate_node_count_limit=*/1);
+ }
+ {
+ // f2 not in [1,2,3], has data
+ auto predicate =
+ PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2",
FieldType::BIGINT,
+ {Literal(1l), Literal(2l), Literal(3l)});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 in [2,3,4], has data
+ auto predicate =
+ PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2",
FieldType::BIGINT,
+ {Literal(2l), Literal(3l), Literal(4l)});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 not in [2,3,4], has data
+ auto predicate =
+ PredicateBuilder::NotIn(/*field_index=*/2, /*field_name=*/"f2",
FieldType::BIGINT,
+ {Literal(2l), Literal(3l), Literal(4l)});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestBoolData) {
+ PrepareTestData(struct_array_);
+ auto data_type = struct_array_->struct_type();
+ arrow::FieldVector fields = {data_type->GetFieldByName("f0"),
data_type->GetFieldByName("f1"),
+ data_type->GetFieldByName("f2"),
data_type->GetFieldByName("f3"),
+ data_type->GetFieldByName("f4")};
+ auto read_schema = arrow::schema(fields);
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make(
+ {struct_array_->GetFieldByName("f0"),
struct_array_->GetFieldByName("f1"),
+ struct_array_->GetFieldByName("f2"),
struct_array_->GetFieldByName("f3"),
+ struct_array_->GetFieldByName("f4")},
+ fields)
+ .ValueOrDie();
+ {
+ // f3 is null, no data
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f3",
FieldType::BOOLEAN);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f3 is not null, has data
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/3,
/*field_name=*/"f3", FieldType::BOOLEAN);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f3 == true, has data
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
Literal(true));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::In(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN,
{Literal(false)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestStringData) {
+ PrepareTestData(struct_array_);
+ auto data_type = struct_array_->struct_type();
+ arrow::FieldVector fields = {data_type->GetFieldByName("f0"),
data_type->GetFieldByName("f1"),
+ data_type->GetFieldByName("f2"),
data_type->GetFieldByName("f3"),
+ data_type->GetFieldByName("f4")};
+ auto read_schema = arrow::schema(fields);
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make(
+ {struct_array_->GetFieldByName("f0"),
struct_array_->GetFieldByName("f1"),
+ struct_array_->GetFieldByName("f2"),
struct_array_->GetFieldByName("f3"),
+ struct_array_->GetFieldByName("f4")},
+ fields)
+ .ValueOrDie();
+ {
+ // f0 is null, no data
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f0 is not null, has data
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/0,
/*field_name=*/"f0", FieldType::STRING);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f0 == apple, has data
+ auto predicate =
+ PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "apple", 5));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f0 == anything, no data
+ auto predicate =
+ PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "anything", 8));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f0 > zooooooo, no data
+ auto predicate = PredicateBuilder::GreaterThan(
+ /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
+ Literal(FieldType::STRING, "zooooooo", 8));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f0 like 'ba%', has data
+ ASSERT_OK_AND_ASSIGN(const auto predicate,
+ PredicateBuilder::StartsWith(
+ /*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "ba", 2)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+ {
+ // f0 like '%ta', has data
+ ASSERT_OK_AND_ASSIGN(const auto predicate,
+ PredicateBuilder::EndsWith(
+ /*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "ta", 2)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+ {
+ // f0 like '%me%', has data
+ ASSERT_OK_AND_ASSIGN(const auto predicate,
+ PredicateBuilder::Contains(
+ /*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "me", 2)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+ {
+ // f0 like 'me', no data
+ ASSERT_OK_AND_ASSIGN(const auto predicate,
+ PredicateBuilder::Like(
+ /*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
+ Literal(FieldType::STRING, "me", 2)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestBinaryData) {
+ PrepareTestData(struct_array_);
+ auto data_type = struct_array_->struct_type();
+ arrow::FieldVector fields = {data_type->GetFieldByName("f5")};
+ auto read_schema = arrow::schema(fields);
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make({struct_array_->GetFieldByName("f5")},
fields).ValueOrDie();
+ // paimon does not pushdown binary type to parquet, will skip this
predicate
+ {
+ // f5 < aaa, do not have data, but binary predicate will be skipped
+ auto predicate =
+ PredicateBuilder::LessThan(/*field_index=*/5, /*field_name=*/"f5",
FieldType::BINARY,
+ Literal(FieldType::BINARY, "aaa", 3));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f5 >= zoo, do not have data, but binary predicate will be skipped
+ auto predicate = PredicateBuilder::GreaterOrEqual(
+ /*field_index=*/5, /*field_name=*/"f5", FieldType::BINARY,
+ Literal(FieldType::BINARY, "zoo", 3));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestPredicatePushdownWithAllDataNull) {
+ PrepareTestData(struct_array_);
+ auto data_type = struct_array_->struct_type();
+ arrow::FieldVector fields = {data_type->GetFieldByName("f0"),
data_type->GetFieldByName("f1"),
+ data_type->GetFieldByName("f2"),
data_type->GetFieldByName("f3"),
+ data_type->GetFieldByName("f4")};
+ auto read_schema = arrow::schema(fields);
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make(
+ {struct_array_->GetFieldByName("f0"),
struct_array_->GetFieldByName("f1"),
+ struct_array_->GetFieldByName("f2"),
struct_array_->GetFieldByName("f3"),
+ struct_array_->GetFieldByName("f4")},
+ fields)
+ .ValueOrDie();
+ {
+ // f4 is null, has data
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/4, /*field_name=*/"f4",
FieldType::BIGINT);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+
+ // other predicate, always return IS_NULL (no data)
+ {
+ // f4 in [1,2], no data
+ auto predicate = PredicateBuilder::In(/*field_index=*/4,
/*field_name=*/"f4",
+ FieldType::BIGINT, {Literal(1l),
Literal(2l)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f4 not in [1,2], no data
+ auto predicate = PredicateBuilder::NotIn(/*field_index=*/4,
/*field_name=*/"f4",
+ FieldType::BIGINT,
{Literal(1l), Literal(2l)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f4 >= 3, no data
+ auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/4,
/*field_name=*/"f4",
+ FieldType::BIGINT,
Literal(3l));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f4 <= 3, no data
+ auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/4,
/*field_name=*/"f4",
+ FieldType::BIGINT,
Literal(3l));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestCompoundPredicate) {
+ PrepareTestData(struct_array_);
+ auto read_schema = arrow::schema(struct_array_->struct_type()->fields());
+ std::shared_ptr<arrow::Array> expected_array = struct_array_;
+ {
+ // f2 < 6 and f1 == 4 and f3 == true, has data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0))),
+ PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3", FieldType::BOOLEAN,
+ Literal(true))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 6 and f1 == 4 and f3 is null, no data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0))),
+ PredicateBuilder::IsNull(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN)}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f2 < 6 and f1 == 4 and f5 is null, no data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0))),
+ PredicateBuilder::IsNull(/*field_index=*/5,
/*field_name=*/"f5",
+ FieldType::BINARY)}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f2 < 6 and f1 == 4 and f5 == zoo, has data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0))),
+ PredicateBuilder::Equal(/*field_index=*/5,
/*field_name=*/"f5", FieldType::BINARY,
+ Literal(FieldType::BINARY, "zoo",
3))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 6 and f1 == 5 and f5 is null, no data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::And(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(5.0))),
+ PredicateBuilder::IsNull(/*field_index=*/5,
/*field_name=*/"f5",
+ FieldType::BINARY)}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ // f2 < 6 or f1 == 4, has data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0)))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 6 or f1 == 5, has data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(6l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(5.0)))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 2 or f5 is null, no data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+
PredicateBuilder::Or({PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+
FieldType::BIGINT, Literal(2l)),
+ PredicateBuilder::IsNull(/*field_index=*/5,
/*field_name=*/"f5",
+
FieldType::BINARY)}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, nullptr);
+ }
+ {
+ // f2 < 2 or f5 = zoo, ignore <or predicate> as it contains binary
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(2l)),
+ PredicateBuilder::Equal(/*field_index=*/5,
/*field_name=*/"f5", FieldType::BINARY,
+ Literal(FieldType::BINARY, "zoo",
3))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 2 or f1 == 4 or f3 == false, has data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(2l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(4.0))),
+ PredicateBuilder::Equal(/*field_index=*/3,
/*field_name=*/"f3", FieldType::BOOLEAN,
+ Literal(false))}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ // f2 < 2 or f1 == 5 or f3 is null, no data
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BIGINT, Literal(2l)),
+ PredicateBuilder::Equal(/*field_index=*/1,
/*field_name=*/"f1", FieldType::FLOAT,
+ Literal(static_cast<float>(5.0))),
+ PredicateBuilder::IsNull(/*field_index=*/3,
/*field_name=*/"f3",
+ FieldType::BOOLEAN)}));
+ ASSERT_TRUE(predicate);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestComplexType) {
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::int32()),
+ arrow::field("f3", arrow::date32()),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f5", arrow::decimal128(23, 5)),
+ };
+ auto read_schema = arrow::schema(fields);
+ auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [10, 1, 1234, "2033-05-18 03:33:20.0",
"123456789987654321.45678"],
+ [10, 1, 19909, "2033-05-18 03:33:20.000001001", "12.30000"],
+ [10, 1, 0, "2008-12-28 00:00:00.000123456", "12.30000"],
+ [10, 1, 100, "2008-12-28 00:00:00.00012345", "-123.45000"],
+ [10, 1, 100, "1899-01-01 00:59:20.001001001", "0.00000"],
+ [10, 1, 20006, "2024-10-10 10:10:10.100100100", "1728551410100.10010"]
+ ])")
+ .ValueOrDie());
+ PrepareTestData(expected_array);
+ // date
+ {
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f3",
FieldType::DATE);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/2,
/*field_name=*/"f3", FieldType::DATE);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f3",
+ FieldType::DATE,
Literal(FieldType::DATE, 4));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f3",
+ FieldType::DATE,
Literal(FieldType::DATE, -111));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::LessThan(
+ /*field_index=*/2, /*field_name=*/"f3", FieldType::DATE,
+ Literal(FieldType::DATE, 20006));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterThan(
+ /*field_index=*/2, /*field_name=*/"f3", FieldType::DATE,
+ Literal(FieldType::DATE, 20006));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+
+ // timestamp, always be ignored
+ {
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f4",
FieldType::TIMESTAMP);
+ CheckResult(read_schema, predicate, /*expected_array=*/
+ expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::IsNotNull(/*field_index=*/3,
/*field_name=*/"f4",
+ FieldType::TIMESTAMP);
+ CheckResult(read_schema, predicate, /*expected_array=*/
+ expected_array);
+ }
+ {
+ auto predicate =
+ PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f4",
FieldType::TIMESTAMP,
+ Literal(Timestamp(2240521239999l, 0)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+ {
+ auto predicate =
+ PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f4",
FieldType::TIMESTAMP,
+ Literal(Timestamp(1230422400000l,
123460)));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3,
/*field_name=*/"f4",
+ FieldType::TIMESTAMP,
+
Literal(Timestamp(2000000000000l, 1001)));
+ CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
+ }
+
+ // decimal
+ {
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/4, /*field_name=*/"f5",
FieldType::DECIMAL);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/4,
/*field_name=*/"f5", FieldType::DECIMAL);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ Literal(Decimal(23, 5, 123456)));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ Literal(Decimal(22, 3, -123456)));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::LessThan(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ Literal(Decimal(23, 3,
DecimalUtils::StrToInt128("-123456789987654321567").value())));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::LessThan(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ Literal(Decimal(23, 3,
DecimalUtils::StrToInt128("123456789987654321567").value())));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterThan(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ Literal(Decimal(23, 3,
DecimalUtils::StrToInt128("123456789987654321567").value())));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::In(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ {Literal(Decimal(23, 5,
DecimalUtils::StrToInt128("-12345678998765432134567").value())),
+ Literal(Decimal(23, 5, 1234567))});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::NotIn(
+ /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL,
+ {Literal(Decimal(23, 5,
DecimalUtils::StrToInt128("-12345678998765432134567").value())),
+ Literal(Decimal(23, 5, 1234567))});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+}
+
+TEST_F(PredicatePushdownTest, TestAllNullOrAllSameValue) {
+ arrow::FieldVector fields = {arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::int32())};
+ auto read_schema = arrow::schema(fields);
+ auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [null, 10],
+ [null, 10],
+ [null, 10]
+ ])")
+ .ValueOrDie());
+ PrepareTestData(expected_array);
+ // for f1
+ {
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f1",
FieldType::INT);
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate =
+ PredicateBuilder::IsNotNull(/*field_index=*/0,
/*field_name=*/"f1", FieldType::INT);
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT, Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::In(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT, {Literal(10),
Literal(20)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::NotIn(/*field_index=*/0,
/*field_name=*/"f1",
+ FieldType::INT, {Literal(10),
Literal(20)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ // for f2
+ {
+ auto predicate = PredicateBuilder::NotEqual(/*field_index=*/1,
/*field_name=*/"f2",
+ FieldType::INT,
Literal(10));
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::NotEqual(/*field_index=*/1,
/*field_name=*/"f2",
+ FieldType::INT,
Literal(30));
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::In(/*field_index=*/1,
/*field_name=*/"f2",
+ FieldType::INT, {Literal(10),
Literal(20)});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+ {
+ auto predicate = PredicateBuilder::NotIn(/*field_index=*/1,
/*field_name=*/"f2",
+ FieldType::INT, {Literal(10),
Literal(20)});
+ CheckResult(read_schema, predicate, /*expected_array=*/nullptr);
+ }
+ {
+ auto predicate = PredicateBuilder::NotIn(/*field_index=*/1,
/*field_name=*/"f2",
+ FieldType::INT, {Literal(20),
Literal(30)});
+ CheckResult(read_schema, predicate, expected_array);
+ }
+}
+} // namespace paimon::parquet::test