This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 5e55d84 feat(format): introduce parquet format components (#48)
5e55d84 is described below
commit 5e55d84d5818544c5a9357b4c48f6f6a2e123724
Author: Zhang Jiawei <[email protected]>
AuthorDate: Thu Jun 4 15:30:55 2026 +0800
feat(format): introduce parquet format components (#48)
---
LICENSE | 1 +
.../format/parquet/parquet_field_id_converter.cpp | 120 +++++++++++
.../format/parquet/parquet_field_id_converter.h | 57 +++++
.../parquet/parquet_field_id_converter_test.cpp | 224 +++++++++++++++++++
src/paimon/format/parquet/parquet_file_format.h | 80 +++++++
.../format/parquet/parquet_file_format_factory.cpp | 37 ++++
.../format/parquet/parquet_file_format_factory.h | 43 ++++
src/paimon/format/parquet/parquet_format_defs.h | 76 +++++++
src/paimon/format/parquet/parquet_reader_builder.h | 66 ++++++
src/paimon/format/parquet/parquet_schema_util.cpp | 230 ++++++++++++++++++++
src/paimon/format/parquet/parquet_schema_util.h | 58 +++++
.../format/parquet/parquet_timestamp_converter.cpp | 233 ++++++++++++++++++++
.../format/parquet/parquet_timestamp_converter.h | 46 ++++
.../parquet/parquet_timestamp_converter_test.cpp | 167 +++++++++++++++
.../format/parquet/parquet_writer_builder.cpp | 130 +++++++++++
src/paimon/format/parquet/parquet_writer_builder.h | 83 ++++++++
.../format/parquet/parquet_writer_builder_test.cpp | 237 +++++++++++++++++++++
17 files changed, 1888 insertions(+)
diff --git a/LICENSE b/LICENSE
index 103c326..c69084f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -270,6 +270,7 @@ This product includes code from Apache Arrow.
* Core utilities:
* docs utilities in docs/ directory
+ * parquet schema convertor in
src/paimon/format/parquet/parquet_schema_util.cpp and
src/paimon/format/parquet/parquet_schema_util.h
* basic utilities in
- include/paimon/compare.h
- include/paimon/macros.h
diff --git a/src/paimon/format/parquet/parquet_field_id_converter.cpp
b/src/paimon/format/parquet/parquet_field_id_converter.cpp
new file mode 100644
index 0000000..010df3a
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+
+#include <algorithm>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+
+namespace paimon::parquet {
+
+using arrow::Field;
+using arrow::KeyValueMetadata;
+
+const char ParquetFieldIdConverter::PARQUET_FIELD_ID[] = "PARQUET:field_id";
+
+Result<std::shared_ptr<arrow::Schema>>
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::vector<std::shared_ptr<Field>> new_fields;
+ for (const auto& field : schema->fields()) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<Field> new_field,
+ ProcessField(field,
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID));
+ new_fields.push_back(new_field);
+ }
+ return arrow::schema(new_fields, schema->metadata());
+}
+
+Result<std::shared_ptr<arrow::Schema>>
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::vector<std::shared_ptr<Field>> new_fields;
+ for (const auto& field : schema->fields()) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<Field> new_field,
+ ProcessField(field,
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID));
+ new_fields.push_back(new_field);
+ }
+ return arrow::schema(new_fields, schema->metadata());
+}
+
+arrow::Result<std::shared_ptr<const KeyValueMetadata>>
ParquetFieldIdConverter::CopyId(
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ ParquetFieldIdConverter::IdConvertType convert_type) {
+ auto copy =
+ [&](const std::string& from,
+ const std::string& to) -> arrow::Result<std::shared_ptr<const
KeyValueMetadata>> {
+ if (!metadata || !metadata->Contains(from)) {
+ return metadata;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto paimon_id, metadata->Get(from));
+ std::vector<std::string> keys = {to};
+ std::vector<std::string> values = {paimon_id};
+ auto new_meta = KeyValueMetadata::Make(keys, values);
+ return metadata->Merge(*new_meta);
+ };
+
+ if (convert_type ==
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID) {
+ // in write process
+ return copy(DataField::FIELD_ID, PARQUET_FIELD_ID);
+ } else if (convert_type ==
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID) {
+ // in read process
+ return copy(PARQUET_FIELD_ID, DataField::FIELD_ID);
+ }
+ return arrow::Status::Invalid("only support PAIMON_TO_PARQUET_ID and
PARQUET_TO_PAIMON_ID");
+}
+
+arrow::Result<std::shared_ptr<Field>> ParquetFieldIdConverter::ProcessField(
+ const std::shared_ptr<Field>& field,
ParquetFieldIdConverter::IdConvertType convert_type) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<const KeyValueMetadata>
updated_metadata,
+ CopyId(field->metadata(), convert_type));
+ auto type = field->type();
+ if (type->id() == arrow::Type::STRUCT) {
+ auto struct_type = std::static_pointer_cast<arrow::StructType>(type);
+ std::vector<std::shared_ptr<Field>> new_fields;
+ for (const auto& child : struct_type->fields()) {
+ ARROW_ASSIGN_OR_RAISE(auto new_child, ProcessField(child,
convert_type));
+ new_fields.push_back(new_child);
+ }
+ auto new_type = arrow::struct_(new_fields);
+ return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+ } else if (type->id() == arrow::Type::LIST) {
+ auto list_type = std::static_pointer_cast<arrow::ListType>(type);
+ ARROW_ASSIGN_OR_RAISE(auto new_value_field,
+ ProcessField(list_type->value_field(),
convert_type));
+ auto new_type = arrow::list(new_value_field);
+ return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+ } else if (type->id() == arrow::Type::MAP) {
+ auto map_type = std::static_pointer_cast<arrow::MapType>(type);
+ ARROW_ASSIGN_OR_RAISE(auto new_key_field,
+ ProcessField(map_type->key_field(),
convert_type));
+ ARROW_ASSIGN_OR_RAISE(auto new_item_field,
+ ProcessField(map_type->item_field(),
convert_type));
+ auto new_type = arrow::map(new_key_field->type(), new_item_field);
+ return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+ }
+
+ return field->WithMergedMetadata(updated_metadata);
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_field_id_converter.h
b/src/paimon/format/parquet/parquet_field_id_converter.h
new file mode 100644
index 0000000..4e26668
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetFieldIdConverter {
+ public:
+ ParquetFieldIdConverter() = delete;
+ ~ParquetFieldIdConverter() = delete;
+
+ static const char PARQUET_FIELD_ID[];
+ // Iterate through all fields in the Schema.
+ // For each field, copy the value of 'paimon.id' to 'PARQUET:field_id'.
+ static Result<std::shared_ptr<arrow::Schema>> AddParquetIdsFromPaimonIds(
+ const std::shared_ptr<arrow::Schema>& schema);
+ // Iterate through all fields in the Schema.
+ // For each field, copy the value of 'PARQUET:field_id' to 'paimon.id'.
+ static Result<std::shared_ptr<arrow::Schema>> GetPaimonIdsFromParquetIds(
+ const std::shared_ptr<arrow::Schema>& schema);
+
+ private:
+ enum class IdConvertType {
+ PARQUET_TO_PAIMON_ID = 1,
+ PAIMON_TO_PARQUET_ID = 2,
+ };
+
+ static arrow::Result<std::shared_ptr<arrow::Field>> ProcessField(
+ const std::shared_ptr<arrow::Field>& field, IdConvertType
convert_type);
+
+ static arrow::Result<std::shared_ptr<const arrow::KeyValueMetadata>>
CopyId(
+ const std::shared_ptr<const arrow::KeyValueMetadata>& metadata,
IdConvertType convert_type);
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_field_id_converter_test.cpp
b/src/paimon/format/parquet/parquet_field_id_converter_test.cpp
new file mode 100644
index 0000000..e85cb44
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter_test.cpp
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+class ParquetFieldIdConverterTest : public ::testing::Test {
+ public:
+ class FieldInfo {
+ public:
+ std::string field_name;
+ arrow::Type::type field_type;
+ std::string field_id;
+
+ std::string ToString() const {
+ return field_name + " " + arrow::internal::ToString(field_type) +
" " + field_id;
+ }
+
+ bool operator==(const FieldInfo& rhs) const {
+ return field_name == rhs.field_name && field_type ==
rhs.field_type &&
+ field_id == rhs.field_id;
+ }
+ };
+
+ void SetUp() override {
+ print_ = false;
+ }
+
+ void PrintFieldMetadata(const std::shared_ptr<arrow::Schema>& schema,
+ ParquetFieldIdConverter::IdConvertType
convert_type,
+ std::vector<FieldInfo>* field_infos) const {
+ for (const auto& field : schema->fields()) {
+ PrintFieldMetadataRecursive(field, /*indent=*/2, convert_type,
field_infos);
+ }
+ }
+
+ void PrintFieldMetadataRecursive(const std::shared_ptr<arrow::Field>&
field, int32_t indent,
+ ParquetFieldIdConverter::IdConvertType
convert_type,
+ std::vector<FieldInfo>* field_infos)
const {
+ std::string prefix(indent * 2, ' ');
+ if (print_) {
+ std::cout << prefix << "Field: " << field->name() << " (" <<
field->type()->ToString()
+ << ")" << std::endl;
+ }
+
+ if (field->HasMetadata() && field->metadata()) {
+ auto meta = field->metadata();
+ std::string field_id;
+ if (convert_type ==
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID) {
+ field_id =
+
field->metadata()->Get(ParquetFieldIdConverter::PARQUET_FIELD_ID).ValueOrDie();
+ } else {
+ field_id =
field->metadata()->Get(DataField::FIELD_ID).ValueOrDie();
+ }
+ field_infos->push_back({field->name(), field->type()->id(),
field_id});
+ if (print_) {
+ for (int32_t i = 0; i < meta->size(); ++i) {
+ std::cout << prefix << " [meta] " << meta->key(i) << " =
" << meta->value(i)
+ << std::endl;
+ }
+ }
+ }
+
+ auto type_id = field->type()->id();
+ if (type_id == arrow::Type::STRUCT) {
+ auto struct_type =
std::static_pointer_cast<arrow::StructType>(field->type());
+ for (const auto& child : struct_type->fields()) {
+ PrintFieldMetadataRecursive(child, indent + 1, convert_type,
field_infos);
+ }
+ } else if (type_id == arrow::Type::LIST) {
+ auto list_type =
std::static_pointer_cast<arrow::ListType>(field->type());
+ PrintFieldMetadataRecursive(list_type->value_field(), indent + 1,
convert_type,
+ field_infos);
+ } else if (type_id == arrow::Type::MAP) {
+ auto map_type =
std::static_pointer_cast<arrow::MapType>(field->type());
+ PrintFieldMetadataRecursive(map_type->key_field(), indent + 1,
convert_type,
+ field_infos);
+ PrintFieldMetadataRecursive(map_type->item_field(), indent + 1,
convert_type,
+ field_infos);
+ }
+ }
+
+ private:
+ bool print_ = false;
+};
+
+TEST_F(ParquetFieldIdConverterTest, TestSimple) {
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::boolean()),
+ arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()),
+ arrow::field("f4", arrow::int32()),
+ arrow::field("f5", arrow::int64()),
+ arrow::field("f6", arrow::float32()),
+ arrow::field("f7", arrow::float64()),
+ arrow::field("f8", arrow::utf8()),
+ arrow::field("f9", arrow::binary()),
+ arrow::field("f10", arrow::map(arrow::list(arrow::float32()),
+ arrow::struct_({arrow::field("k0",
arrow::boolean()),
+ arrow::field("v0",
arrow::int64())}))),
+ arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f12", arrow::date32()),
+ arrow::field("f13", arrow::decimal128(2, 2))};
+ auto schema = arrow::schema(fields);
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{},
/*primary_keys=*/{},
+ /*options=*/{}));
+ auto schema_with_field_id =
DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
+ ASSERT_OK_AND_ASSIGN(auto new_schema,
+
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(schema_with_field_id));
+ // convert to PARQUET:field_id
+ std::vector<FieldInfo> field_infos;
+ PrintFieldMetadata(new_schema,
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID,
+ &field_infos);
+ std::vector<FieldInfo> expected_field_infos = {
+ {"f1", arrow::Type::BOOL, "0"}, {"f2", arrow::Type::INT8, "1"},
+ {"f3", arrow::Type::INT16, "2"}, {"f4", arrow::Type::INT32, "3"},
+ {"f5", arrow::Type::INT64, "4"}, {"f6", arrow::Type::FLOAT, "5"},
+ {"f7", arrow::Type::DOUBLE, "6"}, {"f8", arrow::Type::STRING,
"7"},
+ {"f9", arrow::Type::BINARY, "8"}, {"f10", arrow::Type::MAP, "9"},
+ {"k0", arrow::Type::BOOL, "10"}, {"v0", arrow::Type::INT64,
"11"},
+ {"f11", arrow::Type::TIMESTAMP, "12"}, {"f12", arrow::Type::DATE32,
"13"},
+ {"f13", arrow::Type::DECIMAL128, "14"}};
+ ASSERT_EQ(expected_field_infos, field_infos);
+ // convert to paimon.id
+ ASSERT_OK_AND_ASSIGN(auto old_schema,
+
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(new_schema));
+ std::vector<FieldInfo> old_field_infos;
+ PrintFieldMetadata(old_schema,
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID,
+ &old_field_infos);
+ ASSERT_EQ(expected_field_infos, old_field_infos);
+}
+
+TEST_F(ParquetFieldIdConverterTest, TestNestedType) {
+ arrow::FieldVector fields = {
+ arrow::field("f0",
+ arrow::struct_({arrow::field("sub1", arrow::date32()),
+ arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("sub3",
arrow::decimal128(23, 5)),
+ arrow::field("sub4", arrow::binary()),
+ arrow::field("sub5", arrow::binary())})),
+ arrow::field("f1", arrow::list(arrow::struct_(
+ {arrow::field("sub1", arrow::date32()),
+ arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("sub3", arrow::decimal128(23, 5)),
+ arrow::field("sub4", arrow::binary()),
+ arrow::field("sub5", arrow::binary())}))),
+ arrow::field(
+ "f2", arrow::map(
+ arrow::struct_({arrow::field("sub1", arrow::date32()),
+ arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("sub3",
arrow::decimal128(23, 5)),
+ arrow::field("sub4", arrow::binary()),
+ arrow::field("sub5", arrow::binary())}),
+ arrow::struct_({arrow::field("sub1", arrow::date32()),
+ arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("sub3",
arrow::decimal128(23, 5)),
+ arrow::field("sub4", arrow::binary()),
+ arrow::field("sub5",
arrow::binary())})))};
+ auto schema = arrow::schema(fields);
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{},
/*primary_keys=*/{},
+ /*options=*/{}));
+ auto schema_with_field_id =
DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
+ ASSERT_OK_AND_ASSIGN(auto new_schema,
+
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(schema_with_field_id));
+ // convert to PARQUET:field_id
+ std::vector<FieldInfo> field_infos;
+ PrintFieldMetadata(new_schema,
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID,
+ &field_infos);
+ std::vector<FieldInfo> expected_field_infos = {
+ {"f0", arrow::Type::STRUCT, "0"}, {"sub1", arrow::Type::DATE32,
"1"},
+ {"sub2", arrow::Type::TIMESTAMP, "2"}, {"sub3",
arrow::Type::DECIMAL128, "3"},
+ {"sub4", arrow::Type::BINARY, "4"}, {"sub5", arrow::Type::BINARY,
"5"},
+ {"f1", arrow::Type::LIST, "6"}, {"sub1", arrow::Type::DATE32,
"7"},
+ {"sub2", arrow::Type::TIMESTAMP, "8"}, {"sub3",
arrow::Type::DECIMAL128, "9"},
+ {"sub4", arrow::Type::BINARY, "10"}, {"sub5", arrow::Type::BINARY,
"11"},
+ {"f2", arrow::Type::MAP, "12"}, {"sub1", arrow::Type::DATE32,
"13"},
+ {"sub2", arrow::Type::TIMESTAMP, "14"}, {"sub3",
arrow::Type::DECIMAL128, "15"},
+ {"sub4", arrow::Type::BINARY, "16"}, {"sub5", arrow::Type::BINARY,
"17"},
+ {"sub1", arrow::Type::DATE32, "18"}, {"sub2",
arrow::Type::TIMESTAMP, "19"},
+ {"sub3", arrow::Type::DECIMAL128, "20"}, {"sub4", arrow::Type::BINARY,
"21"},
+ {"sub5", arrow::Type::BINARY, "22"}};
+ ASSERT_EQ(expected_field_infos, field_infos);
+ // convert to paimon.id
+ ASSERT_OK_AND_ASSIGN(auto old_schema,
+
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(new_schema));
+ std::vector<FieldInfo> old_field_infos;
+ PrintFieldMetadata(old_schema,
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID,
+ &old_field_infos);
+ ASSERT_EQ(expected_field_infos, old_field_infos);
+}
+
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_file_format.h
b/src/paimon/format/parquet/parquet_file_format.h
new file mode 100644
index 0000000..d19ef31
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_reader_builder.h"
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+class WriterBuilder;
+class ReaderBuilder;
+class FormatStatsExtractor;
+
+namespace parquet {
+
+class ParquetFileFormat : public FileFormat {
+ public:
+ explicit ParquetFileFormat(const std::map<std::string, std::string>&
options)
+ : identifier_("parquet"), options_(options) {}
+
+ const std::string& Identifier() const override {
+ return identifier_;
+ }
+
+ Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(int32_t
batch_size) const override {
+ return std::make_unique<ParquetReaderBuilder>(options_, batch_size);
+ }
+
+ Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(::ArrowSchema*
schema,
+ int32_t
batch_size) const override {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema>
typed_schema,
+ arrow::ImportSchema(schema));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> new_schema,
+
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(typed_schema));
+ return std::make_unique<ParquetWriterBuilder>(new_schema, batch_size,
options_);
+ }
+
+ Result<std::unique_ptr<FormatStatsExtractor>> CreateStatsExtractor(
+ ::ArrowSchema* schema) const override {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema>
typed_schema,
+ arrow::ImportSchema(schema));
+ return std::make_unique<ParquetStatsExtractor>(typed_schema);
+ }
+
+ protected:
+ std::string identifier_;
+ std::map<std::string, std::string> options_;
+};
+
+} // namespace parquet
+} // namespace paimon
diff --git a/src/paimon/format/parquet/parquet_file_format_factory.cpp
b/src/paimon/format/parquet/parquet_file_format_factory.cpp
new file mode 100644
index 0000000..bef5b9f
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format_factory.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_format_factory.h"
+
+#include <utility>
+
+#include "paimon/factories/factory.h"
+#include "paimon/format/parquet/parquet_file_format.h"
+
+namespace paimon::parquet {
+
+const char ParquetFileFormatFactory::IDENTIFIER[] = "parquet";
+
+Result<std::unique_ptr<FileFormat>> ParquetFileFormatFactory::Create(
+ const std::map<std::string, std::string>& options) const {
+ return std::make_unique<ParquetFileFormat>(options);
+}
+
+REGISTER_PAIMON_FACTORY(ParquetFileFormatFactory);
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_format_factory.h
b/src/paimon/format/parquet/parquet_file_format_factory.h
new file mode 100644
index 0000000..87aa121
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format_factory.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetFileFormatFactory : public FileFormatFactory {
+ public:
+ static const char IDENTIFIER[];
+
+ const char* Identifier() const override {
+ return IDENTIFIER;
+ }
+
+ Result<std::unique_ptr<FileFormat>> Create(
+ const std::map<std::string, std::string>& options) const override;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_defs.h
b/src/paimon/format/parquet/parquet_format_defs.h
new file mode 100644
index 0000000..a1ae347
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_defs.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+namespace paimon::parquet {
+
+// write
+static inline const char PARQUET_BLOCK_SIZE[] = "parquet.block.size";
+static inline const char PARQUET_PAGE_SIZE[] = "parquet.page.size";
+static inline const char PARQUET_DICTIONARY_PAGE_SIZE[] =
"parquet.dictionary.page.size";
+static inline const char PARQUET_ENABLE_DICTIONARY[] =
"parquet.enable-dictionary";
+static inline const char PARQUET_WRITER_VERSION[] = "parquet.writer.version";
+static inline const char PARQUET_WRITE_MAX_ROW_GROUP_LENGTH[] =
+ "parquet.write.max-row-group-length";
+static constexpr int64_t DEFAULT_PARQUET_WRITE_MAX_ROW_GROUP_LENGTH =
+ std::numeric_limits<int64_t>::max();
+static inline const char PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL[] =
+ "parquet.compression.codec.zstd.level";
+static inline const char PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL[] =
"zlib.compress.level";
+static inline const char PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL[] =
"compression.brotli.quality";
+static inline const char PARQUET_WRITER_MAX_MEMORY_USE[] =
"parquet.writer.max.memory.use";
+static constexpr uint64_t DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE = 512 * 1024 *
1024; // 512MB
+
+// read
+static inline const char PARQUET_READ_EXECUTOR_THREAD_COUNT[] =
+ "parquet.read.executor.thread-count";
+static constexpr uint32_t DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT = 3;
+static inline const char PARQUET_READ_CACHE_OPTION_LAZY[] =
"parquet.read.cache-option.lazy";
+static inline const char PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT[] =
+ "parquet.read.cache-option.prefetch-limit";
+static inline const char PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT[] =
+ "parquet.read.cache-option.range-size-limit";
+
+// stack-overflow may happen while the number of predicate node is too large,
limit the number of
+// predicate nodes. Predicate will not be pushdown when exceed limit.
+static inline const char PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT[] =
+ "parquet.read.predicate-node-count-limit";
+
+// Default is true. Compaction will set to false to reduce memory consumption.
+static inline const char PARQUET_READ_ENABLE_PRE_BUFFER[] =
"parquet.read.enable-pre-buffer";
+
+static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT = 0;
+static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT =
32 * 1024 * 1024;
+static constexpr uint32_t DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT =
512;
+
+class ParquetMetrics {
+ public:
+ static inline const char WRITE_RECORD_COUNT[] =
"parquet.write.record.count";
+
+ // read
+ static inline const char READ_ROW_GROUPS_TOTAL[] =
"parquet.read.row-groups.total";
+ static inline const char READ_ROW_GROUPS_AFTER_FILTER[] =
+ "parquet.read.row-groups.after-filter";
+ static inline const char READ_ROWS[] = "parquet.read.rows";
+ static inline const char READ_BATCH_COUNT[] = "parquet.read.batch-count";
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_reader_builder.h
b/src/paimon/format/parquet/parquet_reader_builder.h
new file mode 100644
index 0000000..2a93696
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_reader_builder.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetReaderBuilder : public ReaderBuilder {
+ public:
+ ParquetReaderBuilder(const std::map<std::string, std::string>& options,
int32_t batch_size)
+ : batch_size_(batch_size), pool_(GetDefaultPool()), options_(options)
{}
+
+ ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool)
override {
+ pool_ = pool;
+ return this;
+ }
+
+ Result<std::unique_ptr<FileBatchReader>> Build(
+ const std::shared_ptr<InputStream>& path) const override {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, path->Length());
+ std::shared_ptr<arrow::MemoryPool> arrow_pool = GetArrowPool(pool_);
+ auto input_stream =
+ std::make_unique<ArrowInputStreamAdapter>(path, arrow_pool,
file_length);
+ return ParquetFileBatchReader::Create(std::move(input_stream),
arrow_pool, options_,
+ batch_size_);
+ }
+
+ Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path)
const override {
+ return Status::Invalid("do not support build reader with path in
parquet format");
+ }
+
+ private:
+ int32_t batch_size_ = -1;
+ std::shared_ptr<MemoryPool> pool_;
+ std::map<std::string, std::string> options_;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_schema_util.cpp
b/src/paimon/format/parquet/parquet_schema_util.cpp
new file mode 100644
index 0000000..b94f32d
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_schema_util.cpp
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adapted from Apache Arrow
+//
https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/schema_internal.cc
+
+#include "paimon/format/parquet/parquet_schema_util.h"
+
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+using ArrowType = ::arrow::DataType;
+using ArrowTypeId = ::arrow::Type;
+using ParquetType = parquet::Type;
+
+namespace paimon::parquet {
+
+using ::arrow::Result;
+using ::arrow::Status;
+using ::arrow::internal::checked_cast;
+
+Result<std::shared_ptr<ArrowType>> MakeArrowDecimal(const
::parquet::LogicalType& logical_type) {
+ const auto& decimal = checked_cast<const
::parquet::DecimalLogicalType&>(logical_type);
+ if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) {
+ return ::arrow::Decimal128Type::Make(decimal.precision(),
decimal.scale());
+ }
+ return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale());
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowInt(const ::parquet::LogicalType&
logical_type) {
+ const auto& integer = checked_cast<const
::parquet::IntLogicalType&>(logical_type);
+ switch (integer.bit_width()) {
+ case 8:
+ return integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
+ case 16:
+ return integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
+ case 32:
+ return integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " cannot annotate physical type Int32");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowInt64(const
::parquet::LogicalType& logical_type) {
+ const auto& integer = checked_cast<const
::parquet::IntLogicalType&>(logical_type);
+ switch (integer.bit_width()) {
+ case 64:
+ return integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " cannot annotate physical type Int64");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTime32(const
::parquet::LogicalType& logical_type) {
+ const auto& time = checked_cast<const
::parquet::TimeLogicalType&>(logical_type);
+ switch (time.time_unit()) {
+ case ::parquet::LogicalType::TimeUnit::MILLIS:
+ return ::arrow::time32(::arrow::TimeUnit::MILLI);
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " cannot annotate physical type Time32");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTime64(const
::parquet::LogicalType& logical_type) {
+ const auto& time = checked_cast<const
::parquet::TimeLogicalType&>(logical_type);
+ switch (time.time_unit()) {
+ case ::parquet::LogicalType::TimeUnit::MICROS:
+ return ::arrow::time64(::arrow::TimeUnit::MICRO);
+ case ::parquet::LogicalType::TimeUnit::NANOS:
+ return ::arrow::time64(::arrow::TimeUnit::NANO);
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " cannot annotate physical type Time64");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTimestamp(const
::parquet::LogicalType& logical_type) {
+ const auto& timestamp = checked_cast<const
::parquet::TimestampLogicalType&>(logical_type);
+ const bool utc_normalized = timestamp.is_adjusted_to_utc();
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ switch (timestamp.time_unit()) {
+ case ::parquet::LogicalType::TimeUnit::MILLIS:
+ return (utc_normalized ?
::arrow::timestamp(::arrow::TimeUnit::MILLI, timezone)
+ :
::arrow::timestamp(::arrow::TimeUnit::MILLI));
+ case ::parquet::LogicalType::TimeUnit::MICROS:
+ return (utc_normalized ?
::arrow::timestamp(::arrow::TimeUnit::MICRO, timezone)
+ :
::arrow::timestamp(::arrow::TimeUnit::MICRO));
+ case ::parquet::LogicalType::TimeUnit::NANOS:
+ return (utc_normalized ?
::arrow::timestamp(::arrow::TimeUnit::NANO, timezone)
+ :
::arrow::timestamp(::arrow::TimeUnit::NANO));
+ default:
+ return Status::TypeError("Unrecognized time unit in timestamp
logical_type: ",
+ logical_type.ToString());
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> FromByteArray(const ::parquet::LogicalType&
logical_type) {
+ switch (logical_type.type()) {
+ case ::parquet::LogicalType::Type::STRING:
+ return ::arrow::utf8();
+ case ::parquet::LogicalType::Type::DECIMAL:
+ return MakeArrowDecimal(logical_type);
+ case ::parquet::LogicalType::Type::NONE:
+ case ::parquet::LogicalType::Type::ENUM:
+ case ::parquet::LogicalType::Type::JSON:
+ case ::parquet::LogicalType::Type::BSON:
+ return ::arrow::binary();
+ default:
+ return Status::NotImplemented("Unhandled logical logical_type ",
+ logical_type.ToString(), " for
binary array");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> FromFLBA(const ::parquet::LogicalType&
logical_type,
+ int32_t physical_length) {
+ switch (logical_type.type()) {
+ case ::parquet::LogicalType::Type::DECIMAL:
+ return MakeArrowDecimal(logical_type);
+ case ::parquet::LogicalType::Type::FLOAT16:
+ return ::arrow::float16();
+ case ::parquet::LogicalType::Type::NONE:
+ case ::parquet::LogicalType::Type::INTERVAL:
+ case ::parquet::LogicalType::Type::UUID:
+ return ::arrow::fixed_size_binary(physical_length);
+ default:
+ return Status::NotImplemented("Unhandled logical logical_type ",
+ logical_type.ToString(),
+ " for fixed-length binary array");
+ }
+}
+
+::arrow::Result<std::shared_ptr<ArrowType>> FromInt32(const
::parquet::LogicalType& logical_type) {
+ switch (logical_type.type()) {
+ case ::parquet::LogicalType::Type::INT:
+ return MakeArrowInt(logical_type);
+ case ::parquet::LogicalType::Type::DATE:
+ return ::arrow::date32();
+ case ::parquet::LogicalType::Type::TIME:
+ return MakeArrowTime32(logical_type);
+ case ::parquet::LogicalType::Type::DECIMAL:
+ return MakeArrowDecimal(logical_type);
+ case ::parquet::LogicalType::Type::NONE:
+ return ::arrow::int32();
+ default:
+ return Status::NotImplemented("Unhandled logical type ",
logical_type.ToString(),
+ " for INT32");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> FromInt64(const ::parquet::LogicalType&
logical_type) {
+ switch (logical_type.type()) {
+ case ::parquet::LogicalType::Type::INT:
+ return MakeArrowInt64(logical_type);
+ case ::parquet::LogicalType::Type::DECIMAL:
+ return MakeArrowDecimal(logical_type);
+ case ::parquet::LogicalType::Type::TIMESTAMP:
+ return MakeArrowTimestamp(logical_type);
+ case ::parquet::LogicalType::Type::TIME:
+ return MakeArrowTime64(logical_type);
+ case ::parquet::LogicalType::Type::NONE:
+ return ::arrow::int64();
+ default:
+ return Status::NotImplemented("Unhandled logical type ",
logical_type.ToString(),
+ " for INT64");
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> GetArrowType(
+ ::parquet::Type::type physical_type, const ::parquet::LogicalType&
logical_type,
+ int32_t type_length, const ::arrow::TimeUnit::type int96_arrow_time_unit) {
+ if (logical_type.is_invalid() || logical_type.is_null()) {
+ return ::arrow::null();
+ }
+
+ switch (physical_type) {
+ case ParquetType::BOOLEAN:
+ return ::arrow::boolean();
+ case ParquetType::INT32:
+ return FromInt32(logical_type);
+ case ParquetType::INT64:
+ return FromInt64(logical_type);
+ case ParquetType::INT96:
+ return ::arrow::timestamp(int96_arrow_time_unit);
+ case ParquetType::FLOAT:
+ return ::arrow::float32();
+ case ParquetType::DOUBLE:
+ return ::arrow::float64();
+ case ParquetType::BYTE_ARRAY:
+ return FromByteArray(logical_type);
+ case ParquetType::FIXED_LEN_BYTE_ARRAY:
+ return FromFLBA(logical_type, type_length);
+ default: {
+ // PARQUET-1565: This can occur if the file is corrupt
+ return Status::IOError("Invalid physical column type: ",
TypeToString(physical_type));
+ }
+ }
+}
+
+Result<std::shared_ptr<ArrowType>> GetArrowType(
+ const ::parquet::schema::PrimitiveNode& primitive,
+ const ::arrow::TimeUnit::type int96_arrow_time_unit) {
+ return GetArrowType(primitive.physical_type(), *primitive.logical_type(),
+ primitive.type_length(), int96_arrow_time_unit);
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_schema_util.h
b/src/paimon/format/parquet/parquet_schema_util.h
new file mode 100644
index 0000000..b8eea4e
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_schema_util.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adapted from Apache Arrow
+//
https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/schema_internal.h
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet::schema {
+class PrimitiveNode;
+} // namespace parquet::schema
+
+namespace arrow {
+class DataType;
+}
+
+namespace paimon::parquet {
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromByteArray(
+ const ::parquet::LogicalType& logical_type);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromFLBA(
+ const ::parquet::LogicalType& logical_type, int32_t physical_length);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromInt32(
+ const ::parquet::LogicalType& logical_type);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromInt64(
+ const ::parquet::LogicalType& logical_type);
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
+ ::parquet::Type::type physical_type, const ::parquet::LogicalType&
logical_type,
+ int32_t type_length, ::arrow::TimeUnit::type int96_arrow_time_unit =
::arrow::TimeUnit::NANO);
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
+ const ::parquet::schema::PrimitiveNode& primitive,
+ ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO);
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.cpp
b/src/paimon/format/parquet/parquet_timestamp_converter.cpp
new file mode 100644
index 0000000..c9c9e75
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter.cpp
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/core/casting/timestamp_to_timestamp_cast_executor.h"
+
+namespace paimon::parquet {
+Result<std::shared_ptr<arrow::DataType>>
ParquetTimestampConverter::AdjustTimezone(
+ const std::shared_ptr<arrow::DataType>& src_data_type) {
+ arrow::Type::type type = src_data_type->id();
+ switch (type) {
+ case arrow::Type::type::STRUCT: {
+ auto* src_struct_type =
+
arrow::internal::checked_cast<arrow::StructType*>(src_data_type.get());
+ arrow::FieldVector new_fields;
+ new_fields.reserve(src_struct_type->num_fields());
+ for (int32_t i = 0; i < src_struct_type->num_fields(); ++i) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType>
sub_type,
+
AdjustTimezone(src_struct_type->field(i)->type()));
+
new_fields.push_back(src_struct_type->field(i)->WithType(sub_type));
+ }
+ return arrow::struct_(new_fields);
+ }
+ case arrow::Type::type::MAP: {
+ auto* src_map_type =
+
arrow::internal::checked_cast<arrow::MapType*>(src_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> key_type,
+ AdjustTimezone(src_map_type->key_type()));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> item_type,
+ AdjustTimezone(src_map_type->item_type()));
+ return std::make_shared<arrow::MapType>(
+ src_map_type->key_field()->WithType(key_type),
+ src_map_type->item_field()->WithType(item_type));
+ }
+ case arrow::Type::type::LIST: {
+ auto* src_list_type =
+
arrow::internal::checked_cast<arrow::ListType*>(src_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> value_type,
+
AdjustTimezone(src_list_type->value_type()));
+ return
arrow::list(src_list_type->value_field()->WithType(value_type));
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto* src_ts_type =
+
arrow::internal::checked_cast<arrow::TimestampType*>(src_data_type.get());
+ if (!src_ts_type->timezone().empty()) {
+ return arrow::timestamp(src_ts_type->unit(),
DateTimeUtils::GetLocalTimezoneName());
+ }
+ }
+ default:
+ return src_data_type;
+ }
+}
+
+Result<bool> ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ const std::shared_ptr<arrow::DataType>& src_data_type,
+ const std::shared_ptr<arrow::DataType>& target_data_type) {
+ arrow::Type::type type = src_data_type->id();
+ if (type != target_data_type->id()) {
+ return Status::Invalid(fmt::format("src type {} and target type {}
mismatch",
+ src_data_type->ToString(),
+ target_data_type->ToString()));
+ }
+ switch (type) {
+ case arrow::Type::type::STRUCT: {
+ auto* src_struct_type =
+
arrow::internal::checked_cast<arrow::StructType*>(src_data_type.get());
+ auto* target_struct_type =
+
arrow::internal::checked_cast<arrow::StructType*>(target_data_type.get());
+ if (src_struct_type->num_fields() !=
target_struct_type->num_fields()) {
+ return Status::Invalid(
+ fmt::format("src type {} and target type {} number of
fields mismatch",
+ src_data_type->ToString(),
target_data_type->ToString()));
+ }
+ for (int32_t i = 0; i < src_struct_type->num_fields(); ++i) {
+ PAIMON_ASSIGN_OR_RAISE(bool need_cast,
NeedCastArrayForTimestamp(
+
src_struct_type->field(i)->type(),
+
target_struct_type->field(i)->type()));
+ if (need_cast) {
+ return true;
+ }
+ }
+ return false;
+ }
+ case arrow::Type::type::MAP: {
+ auto* src_map_type =
+
arrow::internal::checked_cast<arrow::MapType*>(src_data_type.get());
+ auto* target_map_type =
+
arrow::internal::checked_cast<arrow::MapType*>(target_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(
+ bool need_cast,
+ NeedCastArrayForTimestamp(src_map_type->key_type(),
target_map_type->key_type()));
+ if (need_cast) {
+ return true;
+ }
+ PAIMON_ASSIGN_OR_RAISE(
+ need_cast,
+ NeedCastArrayForTimestamp(src_map_type->item_type(),
target_map_type->item_type()));
+ return need_cast;
+ }
+ case arrow::Type::type::LIST: {
+ auto* src_list_type =
+
arrow::internal::checked_cast<arrow::ListType*>(src_data_type.get());
+ auto* target_list_type =
+
arrow::internal::checked_cast<arrow::ListType*>(target_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(bool need_cast,
+
NeedCastArrayForTimestamp(src_list_type->value_type(),
+
target_list_type->value_type()));
+ return need_cast;
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto* src_ts_type =
+
arrow::internal::checked_cast<arrow::TimestampType*>(src_data_type.get());
+ auto* target_ts_type =
+
arrow::internal::checked_cast<arrow::TimestampType*>(target_data_type.get());
+ if (src_ts_type->unit() != target_ts_type->unit() ||
+ src_ts_type->timezone() != target_ts_type->timezone()) {
+ return true;
+ }
+ return false;
+ }
+ default:
+ return false;
+ }
+}
+
+Result<std::shared_ptr<arrow::Array>>
ParquetTimestampConverter::CastArrayForTimestamp(
+ const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::DataType>& target_data_type,
+ const std::shared_ptr<arrow::MemoryPool>& arrow_pool) {
+ arrow::Type::type type = array->type()->id();
+ switch (type) {
+ case arrow::Type::type::STRUCT: {
+ auto* struct_array =
arrow::internal::checked_cast<arrow::StructArray*>(array.get());
+ arrow::ArrayVector target_sub_arrays;
+ std::vector<std::string> target_names;
+ target_sub_arrays.reserve(struct_array->num_fields());
+ target_names.reserve(struct_array->num_fields());
+ for (int32_t i = 0; i < struct_array->num_fields(); i++) {
+ const auto& field = struct_array->field(i);
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::Array> sub_array,
+ CastArrayForTimestamp(field,
target_data_type->field(i)->type(), arrow_pool));
+ target_sub_arrays.push_back(sub_array);
+ target_names.push_back(target_data_type->field(i)->name());
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<arrow::Array> new_array,
+ arrow::StructArray::Make(target_sub_arrays, target_names,
+ struct_array->null_bitmap(),
struct_array->null_count(),
+ struct_array->offset()));
+ return new_array;
+ }
+ case arrow::Type::type::MAP: {
+ auto* map_array =
arrow::internal::checked_cast<arrow::MapArray*>(array.get());
+ auto* map_type =
arrow::internal::checked_cast<arrow::MapType*>(target_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::Array> key_array,
+ CastArrayForTimestamp(map_array->keys(), map_type->key_type(),
arrow_pool));
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::Array> item_array,
+ CastArrayForTimestamp(map_array->items(),
map_type->item_type(), arrow_pool));
+ return std::make_shared<arrow::MapArray>(
+ arrow::map(key_array->type(), item_array->type()),
map_array->length(),
+ map_array->value_offsets(), key_array, item_array,
map_array->null_bitmap(),
+ map_array->null_count(), map_array->offset());
+ }
+ case arrow::Type::type::LIST: {
+ auto* list_array =
arrow::internal::checked_cast<arrow::ListArray*>(array.get());
+ auto* list_type =
+
arrow::internal::checked_cast<arrow::ListType*>(target_data_type.get());
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::Array> value_array,
+ CastArrayForTimestamp(list_array->values(),
list_type->value_type(), arrow_pool));
+ return std::make_shared<arrow::ListArray>(
+ arrow::list(value_array->type()), list_array->length(),
list_array->value_offsets(),
+ value_array, list_array->null_bitmap(),
list_array->null_count(),
+ list_array->offset());
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto* ts_array =
arrow::internal::checked_cast<arrow::TimestampArray*>(array.get());
+ auto* src_type =
+
arrow::internal::checked_cast<arrow::TimestampType*>(ts_array->type().get());
+ auto* ts_target_type =
+
arrow::internal::checked_cast<arrow::TimestampType*>(target_data_type.get());
+ if (src_type->unit() == arrow::TimeUnit::type::MILLI &&
+ ts_target_type->unit() == arrow::TimeUnit::type::SECOND) {
+ // parquet writer do not support second, and it cast second to
milli.
+ // Therefore, in paimon file reader, we cast from milli to
second.
+ auto cast_executor =
std::make_shared<TimestampToTimestampCastExecutor>();
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::Array> target_array,
+ cast_executor->Cast(array, target_data_type,
arrow_pool.get()));
+ return target_array;
+ }
+ if (src_type->timezone() != ts_target_type->timezone()) {
+ // 1. For nano type, parquet writer will write nano into int96
type, which does
+ // not contain any stats or zone info. Therefore in paimon
file reader, we add
+ // zone info according to target type 2. For other precision,
parquet reader
+ // will return UTC tz. Therefore, in paimon file reader, we
add local zone info.
+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> target_array,
+
ts_array->View(target_data_type));
+ return target_array;
+ }
+ }
+ default:
+ return array;
+ }
+}
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.h
b/src/paimon/format/parquet/parquet_timestamp_converter.h
new file mode 100644
index 0000000..2954bfe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetTimestampConverter {
+ public:
+ ParquetTimestampConverter() = delete;
+ ~ParquetTimestampConverter() = delete;
+
+ static Result<std::shared_ptr<arrow::DataType>> AdjustTimezone(
+ const std::shared_ptr<arrow::DataType>& src_data_type);
+
+ static Result<bool> NeedCastArrayForTimestamp(
+ const std::shared_ptr<arrow::DataType>& src_data_type,
+ const std::shared_ptr<arrow::DataType>& target_data_type);
+
+ static Result<std::shared_ptr<arrow::Array>> CastArrayForTimestamp(
+ const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::DataType>& target_data_type,
+ const std::shared_ptr<arrow::MemoryPool>& arrow_pool);
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp
b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp
new file mode 100644
index 0000000..4d764e1
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
+ {
+ // single field need cast
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO)),
+ };
+ arrow::FieldVector target_fields = {
+ arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO, "UTC")),
+ };
+ ASSERT_OK_AND_ASSIGN(bool need_cast,
+
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ arrow::struct_(fields),
arrow::struct_(target_fields)));
+ ASSERT_TRUE(need_cast);
+ }
+ {
+ // field in list need cast
+ arrow::FieldVector fields = {
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI))),
+ };
+ arrow::FieldVector target_fields = {
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)))};
+ ASSERT_OK_AND_ASSIGN(bool need_cast,
+
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ arrow::struct_(fields),
arrow::struct_(target_fields)));
+ ASSERT_TRUE(need_cast);
+ }
+ {
+ // field in map need cast
+ arrow::FieldVector fields = {
+ arrow::field("f1",
arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+
arrow::timestamp(arrow::TimeUnit::NANO))),
+ };
+ arrow::FieldVector target_fields = {
+ arrow::field("f1",
arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND),
+
arrow::timestamp(arrow::TimeUnit::NANO, "UTC")))};
+ ASSERT_OK_AND_ASSIGN(bool need_cast,
+
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ arrow::struct_(fields),
arrow::struct_(target_fields)));
+ ASSERT_TRUE(need_cast);
+ }
+ {
+ // field in struct need cast
+ arrow::FieldVector fields = {
+ arrow::field("f3", arrow::struct_(
+ {arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::NANO))})),
+ };
+ arrow::FieldVector target_fields = {
+ arrow::field("f3",
+ arrow::struct_(
+ {arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::NANO, "UTC"))})),
+ };
+ ASSERT_OK_AND_ASSIGN(bool need_cast,
+
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ arrow::struct_(fields),
arrow::struct_(target_fields)));
+ ASSERT_TRUE(need_cast);
+ }
+}
+
+TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO,
"UTC"))),
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI))),
+ arrow::field("f3", arrow::struct_(
+ {arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::MILLI, "UTC")),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::NANO))})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ };
+
+ arrow::FieldVector target_fields = {
+ arrow::field("f1",
arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND),
+ arrow::timestamp(arrow::TimeUnit::MICRO,
timezone))),
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND))),
+ arrow::field("f3",
+ arrow::struct_(
+ {arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::SECOND, timezone)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::NANO, timezone))})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO, timezone)),
+ };
+
+ auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+[[["1970-01-01 00:00:01", "1970-01-01 00:00:00.000001"]], ["1970-01-01
00:00:02"], ["1970-01-01 00:00:02", "1970-01-01 00:00:00.000000002"],
"1970-01-01 00:00:00.000000002"],
+[[["1970-01-01 00:00:03", "1970-01-01 00:00:00.000003"]], ["1970-01-01
00:00:04"], ["1970-01-01 00:00:04", "1970-01-01 00:00:00.000000004"],
"1970-01-01 00:00:00.000000004"],
+[null, null, null, "1970-01-01 00:00:00.000000004"]
+ ])")
+ .ValueOrDie());
+
+ std::shared_ptr<arrow::MemoryPool> pool = GetArrowPool(GetDefaultPool());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> result_array,
+ ParquetTimestampConverter::CastArrayForTimestamp(
+ array, arrow::struct_(target_fields), pool));
+
+ auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(target_fields), R"([
+[[["1970-01-01 00:00:01", "1970-01-01 00:00:00.000001"]], ["1970-01-01
00:00:02"], ["1970-01-01 00:00:02", "1970-01-01 00:00:00.000000002"],
"1970-01-01 00:00:00.000000002"],
+[[["1970-01-01 00:00:03", "1970-01-01 00:00:00.000003"]], ["1970-01-01
00:00:04"], ["1970-01-01 00:00:04", "1970-01-01 00:00:00.000000004"],
"1970-01-01 00:00:00.000000004"],
+[null, null, null, "1970-01-01 00:00:00.000000004"]
+ ])")
+ .ValueOrDie());
+ ASSERT_TRUE(result_array->Equals(expected_array)) <<
result_array->ToString();
+}
+
+TEST(ParquetTimestampConverterTest, TestAdjustTimezone) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO,
"UTC"))),
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, "UTC"))),
+ arrow::field(
+ "f3",
+ arrow::struct_({arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"))})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ };
+
+ arrow::FieldVector target_fields = {
+ arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO,
timezone))),
+ arrow::field("f2",
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, timezone))),
+ arrow::field("f3",
+ arrow::struct_(
+ {arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::MICRO, timezone))})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ };
+
+ ASSERT_OK_AND_ASSIGN(auto result_type,
+
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(fields)));
+ ASSERT_TRUE(result_type->Equals(arrow::struct_(target_fields)));
+}
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_writer_builder.cpp
b/src/paimon/format/parquet/parquet_writer_builder.cpp
new file mode 100644
index 0000000..7fa1899
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+#include <optional>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/type_fwd.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/core/core_options.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/status.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace paimon {
+class OutputStream;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<FormatWriter>> ParquetWriterBuilder::Build(
+ const std::shared_ptr<OutputStream>& out, const std::string& compression) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::WriterProperties>
writer_properties,
+ PrepareWriterProperties(compression));
+ PAIMON_ASSIGN_OR_RAISE(uint64_t max_memory_use,
OptionsUtils::GetValueFromMap<uint64_t>(
+ options_,
PARQUET_WRITER_MAX_MEMORY_USE,
+
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE));
+
+ return ParquetFormatWriter::Create(out, schema_, writer_properties,
max_memory_use, pool_);
+}
+
+Result<std::shared_ptr<::parquet::WriterProperties>>
ParquetWriterBuilder::PrepareWriterProperties(
+ const std::string& compression) {
+ PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options,
CoreOptions::FromMap(options_));
+ PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type compression_type,
+ ArrowUtils::GetCompressionType(compression));
+ ::parquet::WriterProperties::Builder builder;
+ builder.memory_pool(pool_.get());
+ builder.write_batch_size(batch_size_);
+ builder.compression(compression_type);
+ PAIMON_ASSIGN_OR_RAISE(
+ int64_t row_group_length,
+ OptionsUtils::GetValueFromMap<int64_t>(options_,
PARQUET_WRITE_MAX_ROW_GROUP_LENGTH,
+
DEFAULT_PARQUET_WRITE_MAX_ROW_GROUP_LENGTH));
+ builder.max_row_group_length(row_group_length);
+ builder.enable_store_decimal_as_integer();
+ if (arrow::util::Codec::SupportsCompressionLevel(compression_type)) {
+ std::string key = CompressLevelOptionsKey(compression_type);
+ PAIMON_ASSIGN_OR_RAISE(
+ int64_t file_compression_level,
+ OptionsUtils::GetValueFromMap<int64_t>(options_, key,
+ (key ==
PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL)
+ ?
core_options.GetFileCompressionZstdLevel()
+ : 1));
+ builder.compression_level(file_compression_level);
+ }
+
+ PAIMON_ASSIGN_OR_RAISE(int64_t row_group_size,
OptionsUtils::GetValueFromMap<int64_t>(
+ options_,
PARQUET_BLOCK_SIZE,
+
::parquet::DEFAULT_MAX_ROW_GROUP_SIZE));
+ builder.max_row_group_size(row_group_size);
+
+ PAIMON_ASSIGN_OR_RAISE(int64_t page_size,
+ OptionsUtils::GetValueFromMap<int64_t>(options_,
PARQUET_PAGE_SIZE,
+
::parquet::kDefaultDataPageSize));
+ builder.data_pagesize(page_size);
+ PAIMON_ASSIGN_OR_RAISE(bool enable_dictionary,
OptionsUtils::GetValueFromMap<bool>(
+ options_,
PARQUET_ENABLE_DICTIONARY,
+
::parquet::DEFAULT_IS_DICTIONARY_ENABLED));
+ enable_dictionary ? builder.enable_dictionary() :
builder.disable_dictionary();
+ PAIMON_ASSIGN_OR_RAISE(
+ int64_t dictionary_page_size,
+ OptionsUtils::GetValueFromMap<int64_t>(options_,
PARQUET_DICTIONARY_PAGE_SIZE,
+
::parquet::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT));
+ builder.dictionary_pagesize_limit(dictionary_page_size);
+ PAIMON_ASSIGN_OR_RAISE(std::string writer_version,
+ OptionsUtils::GetValueFromMap<std::string>(
+ options_, PARQUET_WRITER_VERSION,
std::string("PARQUET_2_0")));
+ PAIMON_ASSIGN_OR_RAISE(::parquet::ParquetVersion::type version,
+ ConvertWriterVersion(writer_version));
+ builder.version(version);
+ return builder.build();
+}
+
+Result<::parquet::ParquetVersion::type>
ParquetWriterBuilder::ConvertWriterVersion(
+ const std::string& writer_version) {
+ if (writer_version == "PARQUET_1_0" || writer_version == "v1") {
+ return ::parquet::ParquetVersion::type::PARQUET_1_0;
+ } else if (writer_version == "PARQUET_2_0" || writer_version == "v2") {
+ return ::parquet::ParquetVersion::type::PARQUET_2_6;
+ }
+ return Status::Invalid(fmt::format("Unknown writer version {}",
writer_version));
+}
+
+std::string
ParquetWriterBuilder::CompressLevelOptionsKey(::arrow::Compression::type
compression) {
+ switch (compression) {
+ case ::arrow::Compression::GZIP:
+ return PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL;
+ case ::arrow::Compression::BROTLI:
+ return PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL;
+ case ::arrow::Compression::ZSTD:
+ return PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL;
+ default:
+ return "";
+ }
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_writer_builder.h
b/src/paimon/format/parquet/parquet_writer_builder.h
new file mode 100644
index 0000000..7862535
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/util/type_fwd.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/format/writer_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+namespace paimon {
+class OutputStream;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+class ParquetWriterBuilder : public WriterBuilder {
+ public:
+ ParquetWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t
batch_size,
+ const std::map<std::string, std::string>& options)
+ : batch_size_(batch_size),
+ pool_(GetArrowPool(GetDefaultPool())),
+ schema_(schema),
+ options_(options) {
+ assert(schema);
+ }
+
+ WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool)
override {
+ pool_ = GetArrowPool(pool);
+ return this;
+ }
+
+ Result<std::unique_ptr<FormatWriter>> Build(const
std::shared_ptr<OutputStream>& out,
+ const std::string&
compression) override;
+
+ private:
+ static Result<::parquet::ParquetVersion::type> ConvertWriterVersion(
+ const std::string& writer_version);
+
+ static std::string CompressLevelOptionsKey(::arrow::Compression::type
compression);
+
+ Result<std::shared_ptr<::parquet::WriterProperties>>
PrepareWriterProperties(
+ const std::string& compression);
+
+ int32_t batch_size_ = -1;
+ std::shared_ptr<arrow::MemoryPool> pool_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::map<std::string, std::string> options_;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_writer_builder_test.cpp
b/src/paimon/format/parquet/parquet_writer_builder_test.cpp
new file mode 100644
index 0000000..2c5de2c
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder_test.cpp
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+#include <limits>
+
+#include "arrow/type_fwd.h"
+#include "arrow/util/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon::parquet::test {
+
+TEST(ParquetWriterBuilderTest, DefaultPrepareWriterProperties) {
+ arrow::FieldVector fields;
+ std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("zstd"));
+ ASSERT_EQ(::parquet::ParquetVersion::PARQUET_2_6, properties->version());
+ ASSERT_EQ(1024 * 1024, properties->dictionary_pagesize_limit());
+ ASSERT_EQ(1024 * 1024, properties->data_pagesize());
+ ASSERT_EQ(std::numeric_limits<int64_t>::max(),
properties->max_row_group_length());
+ ASSERT_EQ(128 * 1024 * 1024, properties->max_row_group_size());
+ ASSERT_EQ(1024, properties->write_batch_size());
+ ASSERT_EQ(1, properties->default_column_properties().compression_level());
+ ASSERT_TRUE(properties->store_decimal_as_integer());
+}
+
+TEST(ParquetWriterBuilderTest, PrepareWriterProperties) {
+ arrow::FieldVector fields;
+ std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+ std::map<std::string, std::string> options;
+ options[PARQUET_PAGE_SIZE] = "1024";
+ options[PARQUET_DICTIONARY_PAGE_SIZE] = "4096";
+ options[PARQUET_WRITER_VERSION] = "PARQUET_2_0";
+ options[PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL] = "3";
+ options[PARQUET_BLOCK_SIZE] = "2048";
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("zstd"));
+ // Version numbering differs between C++ and Java Parquet implementations.
Java's PARQUET_2_0
+ // format version corresponds to C++'s PARQUET_2_6 enum value.
+ ASSERT_EQ(::parquet::ParquetVersion::PARQUET_2_6, properties->version());
+ ASSERT_EQ(4096, properties->dictionary_pagesize_limit());
+ ASSERT_EQ(1024, properties->data_pagesize());
+ ASSERT_EQ(2048, properties->max_row_group_size());
+ ASSERT_EQ(1024 * 1024, properties->write_batch_size());
+ ASSERT_EQ(3, properties->default_column_properties().compression_level());
+}
+
+TEST(ParquetWriterBuilderTest, PrepareWriterPropertiesWithZstdLevelPriority) {
+ arrow::FieldVector fields;
+ std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_COMPRESSION_ZSTD_LEVEL] = "4";
+ options[PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL] = "3";
+ options[PARQUET_WRITER_VERSION] = "PARQUET_1_0";
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024,
options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("zstd"));
+ ASSERT_EQ(3,
properties->default_column_properties().compression_level());
+ ASSERT_EQ(::parquet::ParquetVersion::PARQUET_1_0,
properties->version());
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_COMPRESSION_ZSTD_LEVEL] = "4";
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024,
options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("zstd"));
+ ASSERT_EQ(4,
properties->default_column_properties().compression_level());
+ }
+}
+
+TEST(ParquetWriterBuilderTest, TestPrepareWriterPropertiesFileCompression) {
+ arrow::FieldVector fields;
+ std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("lz4"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::LZ4_FRAME);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("lz4_raW"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
arrow::Compression::LZ4);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("zstd"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
arrow::Compression::ZSTD);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("LZ4"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::LZ4_FRAME);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("ZSTd"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
arrow::Compression::ZSTD);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ options[PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL] = "3";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("gzip"));
+ ASSERT_EQ(3,
properties->default_column_properties().compression_level());
+ ASSERT_EQ(properties->default_column_properties().compression(),
arrow::Compression::GZIP);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("snappy"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::SNAPPY);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("lzo"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
arrow::Compression::LZO);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_NOK(builder.PrepareWriterProperties("unknown"));
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("uncompressed"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::UNCOMPRESSED);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("lz4_hadoop"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::LZ4_HADOOP);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ options[PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL] = "2";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("brotli"));
+ ASSERT_EQ(2,
properties->default_column_properties().compression_level());
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::BROTLI);
+ }
+ {
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_OK_AND_ASSIGN(auto properties,
builder.PrepareWriterProperties("None"));
+ ASSERT_EQ(properties->default_column_properties().compression(),
+ arrow::Compression::UNCOMPRESSED);
+ }
+}
+
+TEST(ParquetWriterBuilderTest, TestInvalidWriterVersion) {
+ arrow::FieldVector fields;
+ std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+ std::map<std::string, std::string> options;
+ options[Options::FILE_FORMAT] = "parquet";
+ options[Options::MANIFEST_FORMAT] = "parquet";
+ options[PARQUET_WRITER_VERSION] = "PARQUET_3_0";
+ ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+ ASSERT_NOK_WITH_MSG(builder.PrepareWriterProperties("zstd"),
+ "Unknown writer version PARQUET_3_0");
+}
+
+} // namespace paimon::parquet::test