This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e55d84  feat(format): introduce parquet format components (#48)
5e55d84 is described below

commit 5e55d84d5818544c5a9357b4c48f6f6a2e123724
Author: Zhang Jiawei <[email protected]>
AuthorDate: Thu Jun 4 15:30:55 2026 +0800

    feat(format): introduce parquet format components (#48)
---
 LICENSE                                            |   1 +
 .../format/parquet/parquet_field_id_converter.cpp  | 120 +++++++++++
 .../format/parquet/parquet_field_id_converter.h    |  57 +++++
 .../parquet/parquet_field_id_converter_test.cpp    | 224 +++++++++++++++++++
 src/paimon/format/parquet/parquet_file_format.h    |  80 +++++++
 .../format/parquet/parquet_file_format_factory.cpp |  37 ++++
 .../format/parquet/parquet_file_format_factory.h   |  43 ++++
 src/paimon/format/parquet/parquet_format_defs.h    |  76 +++++++
 src/paimon/format/parquet/parquet_reader_builder.h |  66 ++++++
 src/paimon/format/parquet/parquet_schema_util.cpp  | 230 ++++++++++++++++++++
 src/paimon/format/parquet/parquet_schema_util.h    |  58 +++++
 .../format/parquet/parquet_timestamp_converter.cpp | 233 ++++++++++++++++++++
 .../format/parquet/parquet_timestamp_converter.h   |  46 ++++
 .../parquet/parquet_timestamp_converter_test.cpp   | 167 +++++++++++++++
 .../format/parquet/parquet_writer_builder.cpp      | 130 +++++++++++
 src/paimon/format/parquet/parquet_writer_builder.h |  83 ++++++++
 .../format/parquet/parquet_writer_builder_test.cpp | 237 +++++++++++++++++++++
 17 files changed, 1888 insertions(+)

diff --git a/LICENSE b/LICENSE
index 103c326..c69084f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -270,6 +270,7 @@ This product includes code from Apache Arrow.
 
 * Core utilities:
   * docs utilities in docs/ directory
+  * parquet schema convertor in 
src/paimon/format/parquet/parquet_schema_util.cpp and 
src/paimon/format/parquet/parquet_schema_util.h
   * basic utilities in
     - include/paimon/compare.h
     - include/paimon/macros.h
diff --git a/src/paimon/format/parquet/parquet_field_id_converter.cpp 
b/src/paimon/format/parquet/parquet_field_id_converter.cpp
new file mode 100644
index 0000000..010df3a
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+
+#include <algorithm>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+
+namespace paimon::parquet {
+
+using arrow::Field;
+using arrow::KeyValueMetadata;
+
+const char ParquetFieldIdConverter::PARQUET_FIELD_ID[] = "PARQUET:field_id";
+
+Result<std::shared_ptr<arrow::Schema>> 
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(
+    const std::shared_ptr<arrow::Schema>& schema) {
+    std::vector<std::shared_ptr<Field>> new_fields;
+    for (const auto& field : schema->fields()) {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            std::shared_ptr<Field> new_field,
+            ProcessField(field, 
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID));
+        new_fields.push_back(new_field);
+    }
+    return arrow::schema(new_fields, schema->metadata());
+}
+
+Result<std::shared_ptr<arrow::Schema>> 
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(
+    const std::shared_ptr<arrow::Schema>& schema) {
+    std::vector<std::shared_ptr<Field>> new_fields;
+    for (const auto& field : schema->fields()) {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            std::shared_ptr<Field> new_field,
+            ProcessField(field, 
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID));
+        new_fields.push_back(new_field);
+    }
+    return arrow::schema(new_fields, schema->metadata());
+}
+
+arrow::Result<std::shared_ptr<const KeyValueMetadata>> 
ParquetFieldIdConverter::CopyId(
+    const std::shared_ptr<const KeyValueMetadata>& metadata,
+    ParquetFieldIdConverter::IdConvertType convert_type) {
+    auto copy =
+        [&](const std::string& from,
+            const std::string& to) -> arrow::Result<std::shared_ptr<const 
KeyValueMetadata>> {
+        if (!metadata || !metadata->Contains(from)) {
+            return metadata;
+        }
+        ARROW_ASSIGN_OR_RAISE(auto paimon_id, metadata->Get(from));
+        std::vector<std::string> keys = {to};
+        std::vector<std::string> values = {paimon_id};
+        auto new_meta = KeyValueMetadata::Make(keys, values);
+        return metadata->Merge(*new_meta);
+    };
+
+    if (convert_type == 
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID) {
+        // in write process
+        return copy(DataField::FIELD_ID, PARQUET_FIELD_ID);
+    } else if (convert_type == 
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID) {
+        // in read process
+        return copy(PARQUET_FIELD_ID, DataField::FIELD_ID);
+    }
+    return arrow::Status::Invalid("only support PAIMON_TO_PARQUET_ID and 
PARQUET_TO_PAIMON_ID");
+}
+
+arrow::Result<std::shared_ptr<Field>> ParquetFieldIdConverter::ProcessField(
+    const std::shared_ptr<Field>& field, 
ParquetFieldIdConverter::IdConvertType convert_type) {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<const KeyValueMetadata> 
updated_metadata,
+                          CopyId(field->metadata(), convert_type));
+    auto type = field->type();
+    if (type->id() == arrow::Type::STRUCT) {
+        auto struct_type = std::static_pointer_cast<arrow::StructType>(type);
+        std::vector<std::shared_ptr<Field>> new_fields;
+        for (const auto& child : struct_type->fields()) {
+            ARROW_ASSIGN_OR_RAISE(auto new_child, ProcessField(child, 
convert_type));
+            new_fields.push_back(new_child);
+        }
+        auto new_type = arrow::struct_(new_fields);
+        return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+    } else if (type->id() == arrow::Type::LIST) {
+        auto list_type = std::static_pointer_cast<arrow::ListType>(type);
+        ARROW_ASSIGN_OR_RAISE(auto new_value_field,
+                              ProcessField(list_type->value_field(), 
convert_type));
+        auto new_type = arrow::list(new_value_field);
+        return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+    } else if (type->id() == arrow::Type::MAP) {
+        auto map_type = std::static_pointer_cast<arrow::MapType>(type);
+        ARROW_ASSIGN_OR_RAISE(auto new_key_field,
+                              ProcessField(map_type->key_field(), 
convert_type));
+        ARROW_ASSIGN_OR_RAISE(auto new_item_field,
+                              ProcessField(map_type->item_field(), 
convert_type));
+        auto new_type = arrow::map(new_key_field->type(), new_item_field);
+        return field->WithType(new_type)->WithMergedMetadata(updated_metadata);
+    }
+
+    return field->WithMergedMetadata(updated_metadata);
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_field_id_converter.h 
b/src/paimon/format/parquet/parquet_field_id_converter.h
new file mode 100644
index 0000000..4e26668
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetFieldIdConverter {
+ public:
+    ParquetFieldIdConverter() = delete;
+    ~ParquetFieldIdConverter() = delete;
+
+    static const char PARQUET_FIELD_ID[];
+    // Iterate through all fields in the Schema.
+    // For each field, copy the value of 'paimon.id' to 'PARQUET:field_id'.
+    static Result<std::shared_ptr<arrow::Schema>> AddParquetIdsFromPaimonIds(
+        const std::shared_ptr<arrow::Schema>& schema);
+    // Iterate through all fields in the Schema.
+    // For each field, copy the value of 'PARQUET:field_id' to 'paimon.id'.
+    static Result<std::shared_ptr<arrow::Schema>> GetPaimonIdsFromParquetIds(
+        const std::shared_ptr<arrow::Schema>& schema);
+
+ private:
+    enum class IdConvertType {
+        PARQUET_TO_PAIMON_ID = 1,
+        PAIMON_TO_PARQUET_ID = 2,
+    };
+
+    static arrow::Result<std::shared_ptr<arrow::Field>> ProcessField(
+        const std::shared_ptr<arrow::Field>& field, IdConvertType 
convert_type);
+
+    static arrow::Result<std::shared_ptr<const arrow::KeyValueMetadata>> 
CopyId(
+        const std::shared_ptr<const arrow::KeyValueMetadata>& metadata, 
IdConvertType convert_type);
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_field_id_converter_test.cpp 
b/src/paimon/format/parquet/parquet_field_id_converter_test.cpp
new file mode 100644
index 0000000..e85cb44
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_field_id_converter_test.cpp
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+class ParquetFieldIdConverterTest : public ::testing::Test {
+ public:
+    class FieldInfo {
+     public:
+        std::string field_name;
+        arrow::Type::type field_type;
+        std::string field_id;
+
+        std::string ToString() const {
+            return field_name + " " + arrow::internal::ToString(field_type) + 
" " + field_id;
+        }
+
+        bool operator==(const FieldInfo& rhs) const {
+            return field_name == rhs.field_name && field_type == 
rhs.field_type &&
+                   field_id == rhs.field_id;
+        }
+    };
+
+    void SetUp() override {
+        print_ = false;
+    }
+
+    void PrintFieldMetadata(const std::shared_ptr<arrow::Schema>& schema,
+                            ParquetFieldIdConverter::IdConvertType 
convert_type,
+                            std::vector<FieldInfo>* field_infos) const {
+        for (const auto& field : schema->fields()) {
+            PrintFieldMetadataRecursive(field, /*indent=*/2, convert_type, 
field_infos);
+        }
+    }
+
+    void PrintFieldMetadataRecursive(const std::shared_ptr<arrow::Field>& 
field, int32_t indent,
+                                     ParquetFieldIdConverter::IdConvertType 
convert_type,
+                                     std::vector<FieldInfo>* field_infos) 
const {
+        std::string prefix(indent * 2, ' ');
+        if (print_) {
+            std::cout << prefix << "Field: " << field->name() << " (" << 
field->type()->ToString()
+                      << ")" << std::endl;
+        }
+
+        if (field->HasMetadata() && field->metadata()) {
+            auto meta = field->metadata();
+            std::string field_id;
+            if (convert_type == 
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID) {
+                field_id =
+                    
field->metadata()->Get(ParquetFieldIdConverter::PARQUET_FIELD_ID).ValueOrDie();
+            } else {
+                field_id = 
field->metadata()->Get(DataField::FIELD_ID).ValueOrDie();
+            }
+            field_infos->push_back({field->name(), field->type()->id(), 
field_id});
+            if (print_) {
+                for (int32_t i = 0; i < meta->size(); ++i) {
+                    std::cout << prefix << "  [meta] " << meta->key(i) << " = 
" << meta->value(i)
+                              << std::endl;
+                }
+            }
+        }
+
+        auto type_id = field->type()->id();
+        if (type_id == arrow::Type::STRUCT) {
+            auto struct_type = 
std::static_pointer_cast<arrow::StructType>(field->type());
+            for (const auto& child : struct_type->fields()) {
+                PrintFieldMetadataRecursive(child, indent + 1, convert_type, 
field_infos);
+            }
+        } else if (type_id == arrow::Type::LIST) {
+            auto list_type = 
std::static_pointer_cast<arrow::ListType>(field->type());
+            PrintFieldMetadataRecursive(list_type->value_field(), indent + 1, 
convert_type,
+                                        field_infos);
+        } else if (type_id == arrow::Type::MAP) {
+            auto map_type = 
std::static_pointer_cast<arrow::MapType>(field->type());
+            PrintFieldMetadataRecursive(map_type->key_field(), indent + 1, 
convert_type,
+                                        field_infos);
+            PrintFieldMetadataRecursive(map_type->item_field(), indent + 1, 
convert_type,
+                                        field_infos);
+        }
+    }
+
+ private:
+    bool print_ = false;
+};
+
+TEST_F(ParquetFieldIdConverterTest, TestSimple) {
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::boolean()),
+        arrow::field("f2", arrow::int8()),
+        arrow::field("f3", arrow::int16()),
+        arrow::field("f4", arrow::int32()),
+        arrow::field("f5", arrow::int64()),
+        arrow::field("f6", arrow::float32()),
+        arrow::field("f7", arrow::float64()),
+        arrow::field("f8", arrow::utf8()),
+        arrow::field("f9", arrow::binary()),
+        arrow::field("f10", arrow::map(arrow::list(arrow::float32()),
+                                       arrow::struct_({arrow::field("k0", 
arrow::boolean()),
+                                                       arrow::field("v0", 
arrow::int64())}))),
+        arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f12", arrow::date32()),
+        arrow::field("f13", arrow::decimal128(2, 2))};
+    auto schema = arrow::schema(fields);
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, 
/*primary_keys=*/{},
+                            /*options=*/{}));
+    auto schema_with_field_id = 
DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
+    ASSERT_OK_AND_ASSIGN(auto new_schema,
+                         
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(schema_with_field_id));
+    // convert to PARQUET:field_id
+    std::vector<FieldInfo> field_infos;
+    PrintFieldMetadata(new_schema, 
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID,
+                       &field_infos);
+    std::vector<FieldInfo> expected_field_infos = {
+        {"f1", arrow::Type::BOOL, "0"},        {"f2", arrow::Type::INT8, "1"},
+        {"f3", arrow::Type::INT16, "2"},       {"f4", arrow::Type::INT32, "3"},
+        {"f5", arrow::Type::INT64, "4"},       {"f6", arrow::Type::FLOAT, "5"},
+        {"f7", arrow::Type::DOUBLE, "6"},      {"f8", arrow::Type::STRING, 
"7"},
+        {"f9", arrow::Type::BINARY, "8"},      {"f10", arrow::Type::MAP, "9"},
+        {"k0", arrow::Type::BOOL, "10"},       {"v0", arrow::Type::INT64, 
"11"},
+        {"f11", arrow::Type::TIMESTAMP, "12"}, {"f12", arrow::Type::DATE32, 
"13"},
+        {"f13", arrow::Type::DECIMAL128, "14"}};
+    ASSERT_EQ(expected_field_infos, field_infos);
+    // convert to paimon.id
+    ASSERT_OK_AND_ASSIGN(auto old_schema,
+                         
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(new_schema));
+    std::vector<FieldInfo> old_field_infos;
+    PrintFieldMetadata(old_schema, 
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID,
+                       &old_field_infos);
+    ASSERT_EQ(expected_field_infos, old_field_infos);
+}
+
+TEST_F(ParquetFieldIdConverterTest, TestNestedType) {
+    arrow::FieldVector fields = {
+        arrow::field("f0",
+                     arrow::struct_({arrow::field("sub1", arrow::date32()),
+                                     arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                     arrow::field("sub3", 
arrow::decimal128(23, 5)),
+                                     arrow::field("sub4", arrow::binary()),
+                                     arrow::field("sub5", arrow::binary())})),
+        arrow::field("f1", arrow::list(arrow::struct_(
+                               {arrow::field("sub1", arrow::date32()),
+                                arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                arrow::field("sub3", arrow::decimal128(23, 5)),
+                                arrow::field("sub4", arrow::binary()),
+                                arrow::field("sub5", arrow::binary())}))),
+        arrow::field(
+            "f2", arrow::map(
+                      arrow::struct_({arrow::field("sub1", arrow::date32()),
+                                      arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                      arrow::field("sub3", 
arrow::decimal128(23, 5)),
+                                      arrow::field("sub4", arrow::binary()),
+                                      arrow::field("sub5", arrow::binary())}),
+                      arrow::struct_({arrow::field("sub1", arrow::date32()),
+                                      arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                      arrow::field("sub3", 
arrow::decimal128(23, 5)),
+                                      arrow::field("sub4", arrow::binary()),
+                                      arrow::field("sub5", 
arrow::binary())})))};
+    auto schema = arrow::schema(fields);
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, 
/*primary_keys=*/{},
+                            /*options=*/{}));
+    auto schema_with_field_id = 
DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
+    ASSERT_OK_AND_ASSIGN(auto new_schema,
+                         
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(schema_with_field_id));
+    // convert to PARQUET:field_id
+    std::vector<FieldInfo> field_infos;
+    PrintFieldMetadata(new_schema, 
ParquetFieldIdConverter::IdConvertType::PAIMON_TO_PARQUET_ID,
+                       &field_infos);
+    std::vector<FieldInfo> expected_field_infos = {
+        {"f0", arrow::Type::STRUCT, "0"},        {"sub1", arrow::Type::DATE32, 
"1"},
+        {"sub2", arrow::Type::TIMESTAMP, "2"},   {"sub3", 
arrow::Type::DECIMAL128, "3"},
+        {"sub4", arrow::Type::BINARY, "4"},      {"sub5", arrow::Type::BINARY, 
"5"},
+        {"f1", arrow::Type::LIST, "6"},          {"sub1", arrow::Type::DATE32, 
"7"},
+        {"sub2", arrow::Type::TIMESTAMP, "8"},   {"sub3", 
arrow::Type::DECIMAL128, "9"},
+        {"sub4", arrow::Type::BINARY, "10"},     {"sub5", arrow::Type::BINARY, 
"11"},
+        {"f2", arrow::Type::MAP, "12"},          {"sub1", arrow::Type::DATE32, 
"13"},
+        {"sub2", arrow::Type::TIMESTAMP, "14"},  {"sub3", 
arrow::Type::DECIMAL128, "15"},
+        {"sub4", arrow::Type::BINARY, "16"},     {"sub5", arrow::Type::BINARY, 
"17"},
+        {"sub1", arrow::Type::DATE32, "18"},     {"sub2", 
arrow::Type::TIMESTAMP, "19"},
+        {"sub3", arrow::Type::DECIMAL128, "20"}, {"sub4", arrow::Type::BINARY, 
"21"},
+        {"sub5", arrow::Type::BINARY, "22"}};
+    ASSERT_EQ(expected_field_infos, field_infos);
+    // convert to paimon.id
+    ASSERT_OK_AND_ASSIGN(auto old_schema,
+                         
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(new_schema));
+    std::vector<FieldInfo> old_field_infos;
+    PrintFieldMetadata(old_schema, 
ParquetFieldIdConverter::IdConvertType::PARQUET_TO_PAIMON_ID,
+                       &old_field_infos);
+    ASSERT_EQ(expected_field_infos, old_field_infos);
+}
+
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_file_format.h 
b/src/paimon/format/parquet/parquet_file_format.h
new file mode 100644
index 0000000..d19ef31
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_reader_builder.h"
+#include "paimon/format/parquet/parquet_stats_extractor.h"
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+class WriterBuilder;
+class ReaderBuilder;
+class FormatStatsExtractor;
+
+namespace parquet {
+
+class ParquetFileFormat : public FileFormat {
+ public:
+    explicit ParquetFileFormat(const std::map<std::string, std::string>& 
options)
+        : identifier_("parquet"), options_(options) {}
+
+    const std::string& Identifier() const override {
+        return identifier_;
+    }
+
+    Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(int32_t 
batch_size) const override {
+        return std::make_unique<ParquetReaderBuilder>(options_, batch_size);
+    }
+
+    Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(::ArrowSchema* 
schema,
+                                                               int32_t 
batch_size) const override {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
typed_schema,
+                                          arrow::ImportSchema(schema));
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> new_schema,
+                               
ParquetFieldIdConverter::AddParquetIdsFromPaimonIds(typed_schema));
+        return std::make_unique<ParquetWriterBuilder>(new_schema, batch_size, 
options_);
+    }
+
+    Result<std::unique_ptr<FormatStatsExtractor>> CreateStatsExtractor(
+        ::ArrowSchema* schema) const override {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
typed_schema,
+                                          arrow::ImportSchema(schema));
+        return std::make_unique<ParquetStatsExtractor>(typed_schema);
+    }
+
+ protected:
+    std::string identifier_;
+    std::map<std::string, std::string> options_;
+};
+
+}  // namespace parquet
+}  // namespace paimon
diff --git a/src/paimon/format/parquet/parquet_file_format_factory.cpp 
b/src/paimon/format/parquet/parquet_file_format_factory.cpp
new file mode 100644
index 0000000..bef5b9f
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format_factory.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_format_factory.h"
+
+#include <utility>
+
+#include "paimon/factories/factory.h"
+#include "paimon/format/parquet/parquet_file_format.h"
+
+namespace paimon::parquet {
+
+const char ParquetFileFormatFactory::IDENTIFIER[] = "parquet";
+
+Result<std::unique_ptr<FileFormat>> ParquetFileFormatFactory::Create(
+    const std::map<std::string, std::string>& options) const {
+    return std::make_unique<ParquetFileFormat>(options);
+}
+
+REGISTER_PAIMON_FACTORY(ParquetFileFormatFactory);
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_format_factory.h 
b/src/paimon/format/parquet/parquet_file_format_factory.h
new file mode 100644
index 0000000..87aa121
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_format_factory.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetFileFormatFactory : public FileFormatFactory {
+ public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override {
+        return IDENTIFIER;
+    }
+
+    Result<std::unique_ptr<FileFormat>> Create(
+        const std::map<std::string, std::string>& options) const override;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_defs.h 
b/src/paimon/format/parquet/parquet_format_defs.h
new file mode 100644
index 0000000..a1ae347
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_defs.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+namespace paimon::parquet {
+
+// write
+static inline const char PARQUET_BLOCK_SIZE[] = "parquet.block.size";
+static inline const char PARQUET_PAGE_SIZE[] = "parquet.page.size";
+static inline const char PARQUET_DICTIONARY_PAGE_SIZE[] = 
"parquet.dictionary.page.size";
+static inline const char PARQUET_ENABLE_DICTIONARY[] = 
"parquet.enable-dictionary";
+static inline const char PARQUET_WRITER_VERSION[] = "parquet.writer.version";
+static inline const char PARQUET_WRITE_MAX_ROW_GROUP_LENGTH[] =
+    "parquet.write.max-row-group-length";
+static constexpr int64_t DEFAULT_PARQUET_WRITE_MAX_ROW_GROUP_LENGTH =
+    std::numeric_limits<int64_t>::max();
+static inline const char PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL[] =
+    "parquet.compression.codec.zstd.level";
+static inline const char PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL[] = 
"zlib.compress.level";
+static inline const char PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL[] = 
"compression.brotli.quality";
+static inline const char PARQUET_WRITER_MAX_MEMORY_USE[] = 
"parquet.writer.max.memory.use";
+static constexpr uint64_t DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE = 512 * 1024 * 
1024;  // 512MB
+
+// read
+static inline const char PARQUET_READ_EXECUTOR_THREAD_COUNT[] =
+    "parquet.read.executor.thread-count";
+static constexpr uint32_t DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT = 3;
+static inline const char PARQUET_READ_CACHE_OPTION_LAZY[] = 
"parquet.read.cache-option.lazy";
+static inline const char PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT[] =
+    "parquet.read.cache-option.prefetch-limit";
+static inline const char PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT[] =
+    "parquet.read.cache-option.range-size-limit";
+
+// stack-overflow may happen while the number of predicate node is too large, 
limit the number of
+// predicate nodes. Predicate will not be pushdown when exceed limit.
+static inline const char PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT[] =
+    "parquet.read.predicate-node-count-limit";
+
+// Default is true. Compaction will set to false to reduce memory consumption.
+static inline const char PARQUET_READ_ENABLE_PRE_BUFFER[] = 
"parquet.read.enable-pre-buffer";
+
+static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT = 0;
+static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT = 
32 * 1024 * 1024;
+static constexpr uint32_t DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT = 
512;
+
+class ParquetMetrics {
+ public:
+    static inline const char WRITE_RECORD_COUNT[] = 
"parquet.write.record.count";
+
+    // read
+    static inline const char READ_ROW_GROUPS_TOTAL[] = 
"parquet.read.row-groups.total";
+    static inline const char READ_ROW_GROUPS_AFTER_FILTER[] =
+        "parquet.read.row-groups.after-filter";
+    static inline const char READ_ROWS[] = "parquet.read.rows";
+    static inline const char READ_BATCH_COUNT[] = "parquet.read.batch-count";
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_reader_builder.h 
b/src/paimon/format/parquet/parquet_reader_builder.h
new file mode 100644
index 0000000..2a93696
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_reader_builder.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetReaderBuilder : public ReaderBuilder {
+ public:
+    ParquetReaderBuilder(const std::map<std::string, std::string>& options, 
int32_t batch_size)
+        : batch_size_(batch_size), pool_(GetDefaultPool()), options_(options) 
{}
+
+    ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) 
override {
+        pool_ = pool;
+        return this;
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(
+        const std::shared_ptr<InputStream>& path) const override {
+        PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, path->Length());
+        std::shared_ptr<arrow::MemoryPool> arrow_pool = GetArrowPool(pool_);
+        auto input_stream =
+            std::make_unique<ArrowInputStreamAdapter>(path, arrow_pool, 
file_length);
+        return ParquetFileBatchReader::Create(std::move(input_stream), 
arrow_pool, options_,
+                                              batch_size_);
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) 
const override {
+        return Status::Invalid("do not support build reader with path in 
parquet format");
+    }
+
+ private:
+    int32_t batch_size_ = -1;
+    std::shared_ptr<MemoryPool> pool_;
+    std::map<std::string, std::string> options_;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_schema_util.cpp 
b/src/paimon/format/parquet/parquet_schema_util.cpp
new file mode 100644
index 0000000..b94f32d
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_schema_util.cpp
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adapted from Apache Arrow
+// 
https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/schema_internal.cc
+
+#include "paimon/format/parquet/parquet_schema_util.h"
+
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+using ArrowType = ::arrow::DataType;
+using ArrowTypeId = ::arrow::Type;
+using ParquetType = parquet::Type;
+
+namespace paimon::parquet {
+
+using ::arrow::Result;
+using ::arrow::Status;
+using ::arrow::internal::checked_cast;
+
+Result<std::shared_ptr<ArrowType>> MakeArrowDecimal(const 
::parquet::LogicalType& logical_type) {
+    const auto& decimal = checked_cast<const 
::parquet::DecimalLogicalType&>(logical_type);
+    if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) {
+        return ::arrow::Decimal128Type::Make(decimal.precision(), 
decimal.scale());
+    }
+    return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale());
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowInt(const ::parquet::LogicalType& 
logical_type) {
+    const auto& integer = checked_cast<const 
::parquet::IntLogicalType&>(logical_type);
+    switch (integer.bit_width()) {
+        case 8:
+            return integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
+        case 16:
+            return integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
+        case 32:
+            return integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
+        default:
+            return Status::TypeError(logical_type.ToString(),
+                                     " cannot annotate physical type Int32");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowInt64(const 
::parquet::LogicalType& logical_type) {
+    const auto& integer = checked_cast<const 
::parquet::IntLogicalType&>(logical_type);
+    switch (integer.bit_width()) {
+        case 64:
+            return integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
+        default:
+            return Status::TypeError(logical_type.ToString(),
+                                     " cannot annotate physical type Int64");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTime32(const 
::parquet::LogicalType& logical_type) {
+    const auto& time = checked_cast<const 
::parquet::TimeLogicalType&>(logical_type);
+    switch (time.time_unit()) {
+        case ::parquet::LogicalType::TimeUnit::MILLIS:
+            return ::arrow::time32(::arrow::TimeUnit::MILLI);
+        default:
+            return Status::TypeError(logical_type.ToString(),
+                                     " cannot annotate physical type Time32");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTime64(const 
::parquet::LogicalType& logical_type) {
+    const auto& time = checked_cast<const 
::parquet::TimeLogicalType&>(logical_type);
+    switch (time.time_unit()) {
+        case ::parquet::LogicalType::TimeUnit::MICROS:
+            return ::arrow::time64(::arrow::TimeUnit::MICRO);
+        case ::parquet::LogicalType::TimeUnit::NANOS:
+            return ::arrow::time64(::arrow::TimeUnit::NANO);
+        default:
+            return Status::TypeError(logical_type.ToString(),
+                                     " cannot annotate physical type Time64");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> MakeArrowTimestamp(const 
::parquet::LogicalType& logical_type) {
+    const auto& timestamp = checked_cast<const 
::parquet::TimestampLogicalType&>(logical_type);
+    const bool utc_normalized = timestamp.is_adjusted_to_utc();
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    switch (timestamp.time_unit()) {
+        case ::parquet::LogicalType::TimeUnit::MILLIS:
+            return (utc_normalized ? 
::arrow::timestamp(::arrow::TimeUnit::MILLI, timezone)
+                                   : 
::arrow::timestamp(::arrow::TimeUnit::MILLI));
+        case ::parquet::LogicalType::TimeUnit::MICROS:
+            return (utc_normalized ? 
::arrow::timestamp(::arrow::TimeUnit::MICRO, timezone)
+                                   : 
::arrow::timestamp(::arrow::TimeUnit::MICRO));
+        case ::parquet::LogicalType::TimeUnit::NANOS:
+            return (utc_normalized ? 
::arrow::timestamp(::arrow::TimeUnit::NANO, timezone)
+                                   : 
::arrow::timestamp(::arrow::TimeUnit::NANO));
+        default:
+            return Status::TypeError("Unrecognized time unit in timestamp 
logical_type: ",
+                                     logical_type.ToString());
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> FromByteArray(const ::parquet::LogicalType& 
logical_type) {
+    switch (logical_type.type()) {
+        case ::parquet::LogicalType::Type::STRING:
+            return ::arrow::utf8();
+        case ::parquet::LogicalType::Type::DECIMAL:
+            return MakeArrowDecimal(logical_type);
+        case ::parquet::LogicalType::Type::NONE:
+        case ::parquet::LogicalType::Type::ENUM:
+        case ::parquet::LogicalType::Type::JSON:
+        case ::parquet::LogicalType::Type::BSON:
+            return ::arrow::binary();
+        default:
+            return Status::NotImplemented("Unhandled logical logical_type ",
+                                          logical_type.ToString(), " for 
binary array");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> FromFLBA(const ::parquet::LogicalType& 
logical_type,
+                                            int32_t physical_length) {
+    switch (logical_type.type()) {
+        case ::parquet::LogicalType::Type::DECIMAL:
+            return MakeArrowDecimal(logical_type);
+        case ::parquet::LogicalType::Type::FLOAT16:
+            return ::arrow::float16();
+        case ::parquet::LogicalType::Type::NONE:
+        case ::parquet::LogicalType::Type::INTERVAL:
+        case ::parquet::LogicalType::Type::UUID:
+            return ::arrow::fixed_size_binary(physical_length);
+        default:
+            return Status::NotImplemented("Unhandled logical logical_type ",
+                                          logical_type.ToString(),
+                                          " for fixed-length binary array");
+    }
+}
+
+::arrow::Result<std::shared_ptr<ArrowType>> FromInt32(const 
::parquet::LogicalType& logical_type) {
+    switch (logical_type.type()) {
+        case ::parquet::LogicalType::Type::INT:
+            return MakeArrowInt(logical_type);
+        case ::parquet::LogicalType::Type::DATE:
+            return ::arrow::date32();
+        case ::parquet::LogicalType::Type::TIME:
+            return MakeArrowTime32(logical_type);
+        case ::parquet::LogicalType::Type::DECIMAL:
+            return MakeArrowDecimal(logical_type);
+        case ::parquet::LogicalType::Type::NONE:
+            return ::arrow::int32();
+        default:
+            return Status::NotImplemented("Unhandled logical type ", 
logical_type.ToString(),
+                                          " for INT32");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> FromInt64(const ::parquet::LogicalType& 
logical_type) {
+    switch (logical_type.type()) {
+        case ::parquet::LogicalType::Type::INT:
+            return MakeArrowInt64(logical_type);
+        case ::parquet::LogicalType::Type::DECIMAL:
+            return MakeArrowDecimal(logical_type);
+        case ::parquet::LogicalType::Type::TIMESTAMP:
+            return MakeArrowTimestamp(logical_type);
+        case ::parquet::LogicalType::Type::TIME:
+            return MakeArrowTime64(logical_type);
+        case ::parquet::LogicalType::Type::NONE:
+            return ::arrow::int64();
+        default:
+            return Status::NotImplemented("Unhandled logical type ", 
logical_type.ToString(),
+                                          " for INT64");
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> GetArrowType(
+    ::parquet::Type::type physical_type, const ::parquet::LogicalType& 
logical_type,
+    int32_t type_length, const ::arrow::TimeUnit::type int96_arrow_time_unit) {
+    if (logical_type.is_invalid() || logical_type.is_null()) {
+        return ::arrow::null();
+    }
+
+    switch (physical_type) {
+        case ParquetType::BOOLEAN:
+            return ::arrow::boolean();
+        case ParquetType::INT32:
+            return FromInt32(logical_type);
+        case ParquetType::INT64:
+            return FromInt64(logical_type);
+        case ParquetType::INT96:
+            return ::arrow::timestamp(int96_arrow_time_unit);
+        case ParquetType::FLOAT:
+            return ::arrow::float32();
+        case ParquetType::DOUBLE:
+            return ::arrow::float64();
+        case ParquetType::BYTE_ARRAY:
+            return FromByteArray(logical_type);
+        case ParquetType::FIXED_LEN_BYTE_ARRAY:
+            return FromFLBA(logical_type, type_length);
+        default: {
+            // PARQUET-1565: This can occur if the file is corrupt
+            return Status::IOError("Invalid physical column type: ", 
TypeToString(physical_type));
+        }
+    }
+}
+
+Result<std::shared_ptr<ArrowType>> GetArrowType(
+    const ::parquet::schema::PrimitiveNode& primitive,
+    const ::arrow::TimeUnit::type int96_arrow_time_unit) {
+    return GetArrowType(primitive.physical_type(), *primitive.logical_type(),
+                        primitive.type_length(), int96_arrow_time_unit);
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_schema_util.h 
b/src/paimon/format/parquet/parquet_schema_util.h
new file mode 100644
index 0000000..b8eea4e
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_schema_util.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adapted from Apache Arrow
+// 
https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/schema_internal.h
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet::schema {
+class PrimitiveNode;
+}  // namespace parquet::schema
+
+namespace arrow {
+class DataType;
+}
+
+namespace paimon::parquet {
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromByteArray(
+    const ::parquet::LogicalType& logical_type);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromFLBA(
+    const ::parquet::LogicalType& logical_type, int32_t physical_length);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromInt32(
+    const ::parquet::LogicalType& logical_type);
+::arrow::Result<std::shared_ptr<::arrow::DataType>> FromInt64(
+    const ::parquet::LogicalType& logical_type);
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
+    ::parquet::Type::type physical_type, const ::parquet::LogicalType& 
logical_type,
+    int32_t type_length, ::arrow::TimeUnit::type int96_arrow_time_unit = 
::arrow::TimeUnit::NANO);
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
+    const ::parquet::schema::PrimitiveNode& primitive,
+    ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO);
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.cpp 
b/src/paimon/format/parquet/parquet_timestamp_converter.cpp
new file mode 100644
index 0000000..c9c9e75
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter.cpp
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/core/casting/timestamp_to_timestamp_cast_executor.h"
+
+namespace paimon::parquet {
+Result<std::shared_ptr<arrow::DataType>> 
ParquetTimestampConverter::AdjustTimezone(
+    const std::shared_ptr<arrow::DataType>& src_data_type) {
+    arrow::Type::type type = src_data_type->id();
+    switch (type) {
+        case arrow::Type::type::STRUCT: {
+            auto* src_struct_type =
+                
arrow::internal::checked_cast<arrow::StructType*>(src_data_type.get());
+            arrow::FieldVector new_fields;
+            new_fields.reserve(src_struct_type->num_fields());
+            for (int32_t i = 0; i < src_struct_type->num_fields(); ++i) {
+                PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> 
sub_type,
+                                       
AdjustTimezone(src_struct_type->field(i)->type()));
+                
new_fields.push_back(src_struct_type->field(i)->WithType(sub_type));
+            }
+            return arrow::struct_(new_fields);
+        }
+        case arrow::Type::type::MAP: {
+            auto* src_map_type =
+                
arrow::internal::checked_cast<arrow::MapType*>(src_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> key_type,
+                                   AdjustTimezone(src_map_type->key_type()));
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> item_type,
+                                   AdjustTimezone(src_map_type->item_type()));
+            return std::make_shared<arrow::MapType>(
+                src_map_type->key_field()->WithType(key_type),
+                src_map_type->item_field()->WithType(item_type));
+        }
+        case arrow::Type::type::LIST: {
+            auto* src_list_type =
+                
arrow::internal::checked_cast<arrow::ListType*>(src_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> value_type,
+                                   
AdjustTimezone(src_list_type->value_type()));
+            return 
arrow::list(src_list_type->value_field()->WithType(value_type));
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto* src_ts_type =
+                
arrow::internal::checked_cast<arrow::TimestampType*>(src_data_type.get());
+            if (!src_ts_type->timezone().empty()) {
+                return arrow::timestamp(src_ts_type->unit(), 
DateTimeUtils::GetLocalTimezoneName());
+            }
+        }
+        default:
+            return src_data_type;
+    }
+}
+
+Result<bool> ParquetTimestampConverter::NeedCastArrayForTimestamp(
+    const std::shared_ptr<arrow::DataType>& src_data_type,
+    const std::shared_ptr<arrow::DataType>& target_data_type) {
+    arrow::Type::type type = src_data_type->id();
+    if (type != target_data_type->id()) {
+        return Status::Invalid(fmt::format("src type {} and target type {} 
mismatch",
+                                           src_data_type->ToString(),
+                                           target_data_type->ToString()));
+    }
+    switch (type) {
+        case arrow::Type::type::STRUCT: {
+            auto* src_struct_type =
+                
arrow::internal::checked_cast<arrow::StructType*>(src_data_type.get());
+            auto* target_struct_type =
+                
arrow::internal::checked_cast<arrow::StructType*>(target_data_type.get());
+            if (src_struct_type->num_fields() != 
target_struct_type->num_fields()) {
+                return Status::Invalid(
+                    fmt::format("src type {} and target type {} number of 
fields mismatch",
+                                src_data_type->ToString(), 
target_data_type->ToString()));
+            }
+            for (int32_t i = 0; i < src_struct_type->num_fields(); ++i) {
+                PAIMON_ASSIGN_OR_RAISE(bool need_cast, 
NeedCastArrayForTimestamp(
+                                                           
src_struct_type->field(i)->type(),
+                                                           
target_struct_type->field(i)->type()));
+                if (need_cast) {
+                    return true;
+                }
+            }
+            return false;
+        }
+        case arrow::Type::type::MAP: {
+            auto* src_map_type =
+                
arrow::internal::checked_cast<arrow::MapType*>(src_data_type.get());
+            auto* target_map_type =
+                
arrow::internal::checked_cast<arrow::MapType*>(target_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(
+                bool need_cast,
+                NeedCastArrayForTimestamp(src_map_type->key_type(), 
target_map_type->key_type()));
+            if (need_cast) {
+                return true;
+            }
+            PAIMON_ASSIGN_OR_RAISE(
+                need_cast,
+                NeedCastArrayForTimestamp(src_map_type->item_type(), 
target_map_type->item_type()));
+            return need_cast;
+        }
+        case arrow::Type::type::LIST: {
+            auto* src_list_type =
+                
arrow::internal::checked_cast<arrow::ListType*>(src_data_type.get());
+            auto* target_list_type =
+                
arrow::internal::checked_cast<arrow::ListType*>(target_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(bool need_cast,
+                                   
NeedCastArrayForTimestamp(src_list_type->value_type(),
+                                                             
target_list_type->value_type()));
+            return need_cast;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto* src_ts_type =
+                
arrow::internal::checked_cast<arrow::TimestampType*>(src_data_type.get());
+            auto* target_ts_type =
+                
arrow::internal::checked_cast<arrow::TimestampType*>(target_data_type.get());
+            if (src_ts_type->unit() != target_ts_type->unit() ||
+                src_ts_type->timezone() != target_ts_type->timezone()) {
+                return true;
+            }
+            return false;
+        }
+        default:
+            return false;
+    }
+}
+
+Result<std::shared_ptr<arrow::Array>> 
ParquetTimestampConverter::CastArrayForTimestamp(
+    const std::shared_ptr<arrow::Array>& array,
+    const std::shared_ptr<arrow::DataType>& target_data_type,
+    const std::shared_ptr<arrow::MemoryPool>& arrow_pool) {
+    arrow::Type::type type = array->type()->id();
+    switch (type) {
+        case arrow::Type::type::STRUCT: {
+            auto* struct_array = 
arrow::internal::checked_cast<arrow::StructArray*>(array.get());
+            arrow::ArrayVector target_sub_arrays;
+            std::vector<std::string> target_names;
+            target_sub_arrays.reserve(struct_array->num_fields());
+            target_names.reserve(struct_array->num_fields());
+            for (int32_t i = 0; i < struct_array->num_fields(); i++) {
+                const auto& field = struct_array->field(i);
+                PAIMON_ASSIGN_OR_RAISE(
+                    std::shared_ptr<arrow::Array> sub_array,
+                    CastArrayForTimestamp(field, 
target_data_type->field(i)->type(), arrow_pool));
+                target_sub_arrays.push_back(sub_array);
+                target_names.push_back(target_data_type->field(i)->name());
+            }
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+                std::shared_ptr<arrow::Array> new_array,
+                arrow::StructArray::Make(target_sub_arrays, target_names,
+                                         struct_array->null_bitmap(), 
struct_array->null_count(),
+                                         struct_array->offset()));
+            return new_array;
+        }
+        case arrow::Type::type::MAP: {
+            auto* map_array = 
arrow::internal::checked_cast<arrow::MapArray*>(array.get());
+            auto* map_type = 
arrow::internal::checked_cast<arrow::MapType*>(target_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<arrow::Array> key_array,
+                CastArrayForTimestamp(map_array->keys(), map_type->key_type(), 
arrow_pool));
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<arrow::Array> item_array,
+                CastArrayForTimestamp(map_array->items(), 
map_type->item_type(), arrow_pool));
+            return std::make_shared<arrow::MapArray>(
+                arrow::map(key_array->type(), item_array->type()), 
map_array->length(),
+                map_array->value_offsets(), key_array, item_array, 
map_array->null_bitmap(),
+                map_array->null_count(), map_array->offset());
+        }
+        case arrow::Type::type::LIST: {
+            auto* list_array = 
arrow::internal::checked_cast<arrow::ListArray*>(array.get());
+            auto* list_type =
+                
arrow::internal::checked_cast<arrow::ListType*>(target_data_type.get());
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<arrow::Array> value_array,
+                CastArrayForTimestamp(list_array->values(), 
list_type->value_type(), arrow_pool));
+            return std::make_shared<arrow::ListArray>(
+                arrow::list(value_array->type()), list_array->length(), 
list_array->value_offsets(),
+                value_array, list_array->null_bitmap(), 
list_array->null_count(),
+                list_array->offset());
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto* ts_array = 
arrow::internal::checked_cast<arrow::TimestampArray*>(array.get());
+            auto* src_type =
+                
arrow::internal::checked_cast<arrow::TimestampType*>(ts_array->type().get());
+            auto* ts_target_type =
+                
arrow::internal::checked_cast<arrow::TimestampType*>(target_data_type.get());
+            if (src_type->unit() == arrow::TimeUnit::type::MILLI &&
+                ts_target_type->unit() == arrow::TimeUnit::type::SECOND) {
+                // parquet writer do not support second, and it cast second to 
milli.
+                // Therefore, in paimon file reader, we cast from milli to 
second.
+                auto cast_executor = 
std::make_shared<TimestampToTimestampCastExecutor>();
+                PAIMON_ASSIGN_OR_RAISE(
+                    std::shared_ptr<arrow::Array> target_array,
+                    cast_executor->Cast(array, target_data_type, 
arrow_pool.get()));
+                return target_array;
+            }
+            if (src_type->timezone() != ts_target_type->timezone()) {
+                // 1. For nano type, parquet writer will write nano into int96 
type, which does
+                // not contain any stats or zone info. Therefore in paimon 
file reader, we add
+                // zone info according to target type 2. For other precision, 
parquet reader
+                // will return UTC tz. Therefore, in paimon file reader, we 
add local zone info.
+                
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> target_array,
+                                                  
ts_array->View(target_data_type));
+                return target_array;
+            }
+        }
+        default:
+            return array;
+    }
+}
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.h 
b/src/paimon/format/parquet/parquet_timestamp_converter.h
new file mode 100644
index 0000000..2954bfe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "paimon/result.h"
+
+namespace paimon::parquet {
+
+class ParquetTimestampConverter {
+ public:
+    ParquetTimestampConverter() = delete;
+    ~ParquetTimestampConverter() = delete;
+
+    static Result<std::shared_ptr<arrow::DataType>> AdjustTimezone(
+        const std::shared_ptr<arrow::DataType>& src_data_type);
+
+    static Result<bool> NeedCastArrayForTimestamp(
+        const std::shared_ptr<arrow::DataType>& src_data_type,
+        const std::shared_ptr<arrow::DataType>& target_data_type);
+
+    static Result<std::shared_ptr<arrow::Array>> CastArrayForTimestamp(
+        const std::shared_ptr<arrow::Array>& array,
+        const std::shared_ptr<arrow::DataType>& target_data_type,
+        const std::shared_ptr<arrow::MemoryPool>& arrow_pool);
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp 
b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp
new file mode 100644
index 0000000..4d764e1
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::parquet::test {
+
+TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
+    {
+        // single field need cast
+        arrow::FieldVector fields = {
+            arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO)),
+        };
+        arrow::FieldVector target_fields = {
+            arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO, "UTC")),
+        };
+        ASSERT_OK_AND_ASSIGN(bool need_cast,
+                             
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                 arrow::struct_(fields), 
arrow::struct_(target_fields)));
+        ASSERT_TRUE(need_cast);
+    }
+    {
+        // field in list need cast
+        arrow::FieldVector fields = {
+            arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI))),
+        };
+        arrow::FieldVector target_fields = {
+            arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)))};
+        ASSERT_OK_AND_ASSIGN(bool need_cast,
+                             
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                 arrow::struct_(fields), 
arrow::struct_(target_fields)));
+        ASSERT_TRUE(need_cast);
+    }
+    {
+        // field in map need cast
+        arrow::FieldVector fields = {
+            arrow::field("f1", 
arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+                                          
arrow::timestamp(arrow::TimeUnit::NANO))),
+        };
+        arrow::FieldVector target_fields = {
+            arrow::field("f1", 
arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND),
+                                          
arrow::timestamp(arrow::TimeUnit::NANO, "UTC")))};
+        ASSERT_OK_AND_ASSIGN(bool need_cast,
+                             
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                 arrow::struct_(fields), 
arrow::struct_(target_fields)));
+        ASSERT_TRUE(need_cast);
+    }
+    {
+        // field in struct need cast
+        arrow::FieldVector fields = {
+            arrow::field("f3", arrow::struct_(
+                                   {arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::MILLI)),
+                                    arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::NANO))})),
+        };
+        arrow::FieldVector target_fields = {
+            arrow::field("f3",
+                         arrow::struct_(
+                             {arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::MILLI)),
+                              arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::NANO, "UTC"))})),
+        };
+        ASSERT_OK_AND_ASSIGN(bool need_cast,
+                             
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                 arrow::struct_(fields), 
arrow::struct_(target_fields)));
+        ASSERT_TRUE(need_cast);
+    }
+}
+
+TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+                                      arrow::timestamp(arrow::TimeUnit::MICRO, 
"UTC"))),
+        arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI))),
+        arrow::field("f3", arrow::struct_(
+                               {arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::MILLI, "UTC")),
+                                arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::NANO))})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+    };
+
+    arrow::FieldVector target_fields = {
+        arrow::field("f1", 
arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND),
+                                      arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone))),
+        arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND))),
+        arrow::field("f3",
+                     arrow::struct_(
+                         {arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::SECOND, timezone)),
+                          arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::NANO, timezone))})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO, timezone)),
+    };
+
+    auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+[[["1970-01-01 00:00:01", "1970-01-01 00:00:00.000001"]], ["1970-01-01 
00:00:02"], ["1970-01-01 00:00:02", "1970-01-01 00:00:00.000000002"], 
"1970-01-01 00:00:00.000000002"],
+[[["1970-01-01 00:00:03", "1970-01-01 00:00:00.000003"]], ["1970-01-01 
00:00:04"], ["1970-01-01 00:00:04", "1970-01-01 00:00:00.000000004"], 
"1970-01-01 00:00:00.000000004"],
+[null, null, null, "1970-01-01 00:00:00.000000004"]
+    ])")
+            .ValueOrDie());
+
+    std::shared_ptr<arrow::MemoryPool> pool = GetArrowPool(GetDefaultPool());
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> result_array,
+                         ParquetTimestampConverter::CastArrayForTimestamp(
+                             array, arrow::struct_(target_fields), pool));
+
+    auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(target_fields), R"([
+[[["1970-01-01 00:00:01", "1970-01-01 00:00:00.000001"]], ["1970-01-01 
00:00:02"], ["1970-01-01 00:00:02", "1970-01-01 00:00:00.000000002"], 
"1970-01-01 00:00:00.000000002"],
+[[["1970-01-01 00:00:03", "1970-01-01 00:00:00.000003"]], ["1970-01-01 
00:00:04"], ["1970-01-01 00:00:04", "1970-01-01 00:00:00.000000004"], 
"1970-01-01 00:00:00.000000004"],
+[null, null, null, "1970-01-01 00:00:00.000000004"]
+    ])")
+            .ValueOrDie());
+    ASSERT_TRUE(result_array->Equals(expected_array)) << 
result_array->ToString();
+}
+
+TEST(ParquetTimestampConverterTest, TestAdjustTimezone) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+                                      arrow::timestamp(arrow::TimeUnit::MICRO, 
"UTC"))),
+        arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, "UTC"))),
+        arrow::field(
+            "f3",
+            arrow::struct_({arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::MILLI)),
+                            arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"))})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+    };
+
+    arrow::FieldVector target_fields = {
+        arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
+                                      arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone))),
+        arrow::field("f2", 
arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, timezone))),
+        arrow::field("f3",
+                     arrow::struct_(
+                         {arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::MILLI)),
+                          arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::MICRO, timezone))})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+    };
+
+    ASSERT_OK_AND_ASSIGN(auto result_type,
+                         
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(fields)));
+    ASSERT_TRUE(result_type->Equals(arrow::struct_(target_fields)));
+}
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_writer_builder.cpp 
b/src/paimon/format/parquet/parquet_writer_builder.cpp
new file mode 100644
index 0000000..7fa1899
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+#include <optional>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/type_fwd.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/core/core_options.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/status.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace paimon {
+class OutputStream;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<FormatWriter>> ParquetWriterBuilder::Build(
+    const std::shared_ptr<OutputStream>& out, const std::string& compression) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::WriterProperties> 
writer_properties,
+                           PrepareWriterProperties(compression));
+    PAIMON_ASSIGN_OR_RAISE(uint64_t max_memory_use, 
OptionsUtils::GetValueFromMap<uint64_t>(
+                                                        options_, 
PARQUET_WRITER_MAX_MEMORY_USE,
+                                                        
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE));
+
+    return ParquetFormatWriter::Create(out, schema_, writer_properties, 
max_memory_use, pool_);
+}
+
+Result<std::shared_ptr<::parquet::WriterProperties>> 
ParquetWriterBuilder::PrepareWriterProperties(
+    const std::string& compression) {
+    PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, 
CoreOptions::FromMap(options_));
+    PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type compression_type,
+                           ArrowUtils::GetCompressionType(compression));
+    ::parquet::WriterProperties::Builder builder;
+    builder.memory_pool(pool_.get());
+    builder.write_batch_size(batch_size_);
+    builder.compression(compression_type);
+    PAIMON_ASSIGN_OR_RAISE(
+        int64_t row_group_length,
+        OptionsUtils::GetValueFromMap<int64_t>(options_, 
PARQUET_WRITE_MAX_ROW_GROUP_LENGTH,
+                                               
DEFAULT_PARQUET_WRITE_MAX_ROW_GROUP_LENGTH));
+    builder.max_row_group_length(row_group_length);
+    builder.enable_store_decimal_as_integer();
+    if (arrow::util::Codec::SupportsCompressionLevel(compression_type)) {
+        std::string key = CompressLevelOptionsKey(compression_type);
+        PAIMON_ASSIGN_OR_RAISE(
+            int64_t file_compression_level,
+            OptionsUtils::GetValueFromMap<int64_t>(options_, key,
+                                                   (key == 
PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL)
+                                                       ? 
core_options.GetFileCompressionZstdLevel()
+                                                       : 1));
+        builder.compression_level(file_compression_level);
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(int64_t row_group_size, 
OptionsUtils::GetValueFromMap<int64_t>(
+                                                       options_, 
PARQUET_BLOCK_SIZE,
+                                                       
::parquet::DEFAULT_MAX_ROW_GROUP_SIZE));
+    builder.max_row_group_size(row_group_size);
+
+    PAIMON_ASSIGN_OR_RAISE(int64_t page_size,
+                           OptionsUtils::GetValueFromMap<int64_t>(options_, 
PARQUET_PAGE_SIZE,
+                                                                  
::parquet::kDefaultDataPageSize));
+    builder.data_pagesize(page_size);
+    PAIMON_ASSIGN_OR_RAISE(bool enable_dictionary, 
OptionsUtils::GetValueFromMap<bool>(
+                                                       options_, 
PARQUET_ENABLE_DICTIONARY,
+                                                       
::parquet::DEFAULT_IS_DICTIONARY_ENABLED));
+    enable_dictionary ? builder.enable_dictionary() : 
builder.disable_dictionary();
+    PAIMON_ASSIGN_OR_RAISE(
+        int64_t dictionary_page_size,
+        OptionsUtils::GetValueFromMap<int64_t>(options_, 
PARQUET_DICTIONARY_PAGE_SIZE,
+                                               
::parquet::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT));
+    builder.dictionary_pagesize_limit(dictionary_page_size);
+    PAIMON_ASSIGN_OR_RAISE(std::string writer_version,
+                           OptionsUtils::GetValueFromMap<std::string>(
+                               options_, PARQUET_WRITER_VERSION, 
std::string("PARQUET_2_0")));
+    PAIMON_ASSIGN_OR_RAISE(::parquet::ParquetVersion::type version,
+                           ConvertWriterVersion(writer_version));
+    builder.version(version);
+    return builder.build();
+}
+
+Result<::parquet::ParquetVersion::type> 
ParquetWriterBuilder::ConvertWriterVersion(
+    const std::string& writer_version) {
+    if (writer_version == "PARQUET_1_0" || writer_version == "v1") {
+        return ::parquet::ParquetVersion::type::PARQUET_1_0;
+    } else if (writer_version == "PARQUET_2_0" || writer_version == "v2") {
+        return ::parquet::ParquetVersion::type::PARQUET_2_6;
+    }
+    return Status::Invalid(fmt::format("Unknown writer version {}", 
writer_version));
+}
+
+std::string 
ParquetWriterBuilder::CompressLevelOptionsKey(::arrow::Compression::type 
compression) {
+    switch (compression) {
+        case ::arrow::Compression::GZIP:
+            return PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL;
+        case ::arrow::Compression::BROTLI:
+            return PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL;
+        case ::arrow::Compression::ZSTD:
+            return PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL;
+        default:
+            return "";
+    }
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_writer_builder.h 
b/src/paimon/format/parquet/parquet_writer_builder.h
new file mode 100644
index 0000000..7862535
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/util/type_fwd.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/format/writer_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+namespace paimon {
+class OutputStream;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+class ParquetWriterBuilder : public WriterBuilder {
+ public:
+    ParquetWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t 
batch_size,
+                         const std::map<std::string, std::string>& options)
+        : batch_size_(batch_size),
+          pool_(GetArrowPool(GetDefaultPool())),
+          schema_(schema),
+          options_(options) {
+        assert(schema);
+    }
+
+    WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) 
override {
+        pool_ = GetArrowPool(pool);
+        return this;
+    }
+
+    Result<std::unique_ptr<FormatWriter>> Build(const 
std::shared_ptr<OutputStream>& out,
+                                                const std::string& 
compression) override;
+
+ private:
+    static Result<::parquet::ParquetVersion::type> ConvertWriterVersion(
+        const std::string& writer_version);
+
+    static std::string CompressLevelOptionsKey(::arrow::Compression::type 
compression);
+
+    Result<std::shared_ptr<::parquet::WriterProperties>> 
PrepareWriterProperties(
+        const std::string& compression);
+
+    int32_t batch_size_ = -1;
+    std::shared_ptr<arrow::MemoryPool> pool_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::map<std::string, std::string> options_;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_writer_builder_test.cpp 
b/src/paimon/format/parquet/parquet_writer_builder_test.cpp
new file mode 100644
index 0000000..2c5de2c
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_writer_builder_test.cpp
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_writer_builder.h"
+
+#include <limits>
+
+#include "arrow/type_fwd.h"
+#include "arrow/util/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon::parquet::test {
+
+TEST(ParquetWriterBuilderTest, DefaultPrepareWriterProperties) {
+    arrow::FieldVector fields;
+    std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+    std::map<std::string, std::string> options;
+    options[Options::FILE_FORMAT] = "parquet";
+    options[Options::MANIFEST_FORMAT] = "parquet";
+    ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+    ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("zstd"));
+    ASSERT_EQ(::parquet::ParquetVersion::PARQUET_2_6, properties->version());
+    ASSERT_EQ(1024 * 1024, properties->dictionary_pagesize_limit());
+    ASSERT_EQ(1024 * 1024, properties->data_pagesize());
+    ASSERT_EQ(std::numeric_limits<int64_t>::max(), 
properties->max_row_group_length());
+    ASSERT_EQ(128 * 1024 * 1024, properties->max_row_group_size());
+    ASSERT_EQ(1024, properties->write_batch_size());
+    ASSERT_EQ(1, properties->default_column_properties().compression_level());
+    ASSERT_TRUE(properties->store_decimal_as_integer());
+}
+
+TEST(ParquetWriterBuilderTest, PrepareWriterProperties) {
+    arrow::FieldVector fields;
+    std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+    std::map<std::string, std::string> options;
+    options[PARQUET_PAGE_SIZE] = "1024";
+    options[PARQUET_DICTIONARY_PAGE_SIZE] = "4096";
+    options[PARQUET_WRITER_VERSION] = "PARQUET_2_0";
+    options[PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL] = "3";
+    options[PARQUET_BLOCK_SIZE] = "2048";
+    options[Options::FILE_FORMAT] = "parquet";
+    options[Options::MANIFEST_FORMAT] = "parquet";
+    ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024, options);
+    ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("zstd"));
+    // Version numbering differs between C++ and Java Parquet implementations. 
Java's PARQUET_2_0
+    // format version corresponds to C++'s PARQUET_2_6 enum value.
+    ASSERT_EQ(::parquet::ParquetVersion::PARQUET_2_6, properties->version());
+    ASSERT_EQ(4096, properties->dictionary_pagesize_limit());
+    ASSERT_EQ(1024, properties->data_pagesize());
+    ASSERT_EQ(2048, properties->max_row_group_size());
+    ASSERT_EQ(1024 * 1024, properties->write_batch_size());
+    ASSERT_EQ(3, properties->default_column_properties().compression_level());
+}
+
+TEST(ParquetWriterBuilderTest, PrepareWriterPropertiesWithZstdLevelPriority) {
+    arrow::FieldVector fields;
+    std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_COMPRESSION_ZSTD_LEVEL] = "4";
+        options[PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL] = "3";
+        options[PARQUET_WRITER_VERSION] = "PARQUET_1_0";
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024, 
options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("zstd"));
+        ASSERT_EQ(3, 
properties->default_column_properties().compression_level());
+        ASSERT_EQ(::parquet::ParquetVersion::PARQUET_1_0, 
properties->version());
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_COMPRESSION_ZSTD_LEVEL] = "4";
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024, 
options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("zstd"));
+        ASSERT_EQ(4, 
properties->default_column_properties().compression_level());
+    }
+}
+
+TEST(ParquetWriterBuilderTest, TestPrepareWriterPropertiesFileCompression) {
+    arrow::FieldVector fields;
+    std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("lz4"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::LZ4_FRAME);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("lz4_raW"));
+        ASSERT_EQ(properties->default_column_properties().compression(), 
arrow::Compression::LZ4);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("zstd"));
+        ASSERT_EQ(properties->default_column_properties().compression(), 
arrow::Compression::ZSTD);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("LZ4"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::LZ4_FRAME);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("ZSTd"));
+        ASSERT_EQ(properties->default_column_properties().compression(), 
arrow::Compression::ZSTD);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        options[PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL] = "3";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("gzip"));
+        ASSERT_EQ(3, 
properties->default_column_properties().compression_level());
+        ASSERT_EQ(properties->default_column_properties().compression(), 
arrow::Compression::GZIP);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("snappy"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::SNAPPY);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("lzo"));
+        ASSERT_EQ(properties->default_column_properties().compression(), 
arrow::Compression::LZO);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_NOK(builder.PrepareWriterProperties("unknown"));
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("uncompressed"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::UNCOMPRESSED);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("lz4_hadoop"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::LZ4_HADOOP);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        options[PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL] = "2";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("brotli"));
+        ASSERT_EQ(2, 
properties->default_column_properties().compression_level());
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::BROTLI);
+    }
+    {
+        std::map<std::string, std::string> options;
+        options[Options::FILE_FORMAT] = "parquet";
+        options[Options::MANIFEST_FORMAT] = "parquet";
+        ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+        ASSERT_OK_AND_ASSIGN(auto properties, 
builder.PrepareWriterProperties("None"));
+        ASSERT_EQ(properties->default_column_properties().compression(),
+                  arrow::Compression::UNCOMPRESSED);
+    }
+}
+
+TEST(ParquetWriterBuilderTest, TestInvalidWriterVersion) {
+    arrow::FieldVector fields;
+    std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
+    std::map<std::string, std::string> options;
+    options[Options::FILE_FORMAT] = "parquet";
+    options[Options::MANIFEST_FORMAT] = "parquet";
+    options[PARQUET_WRITER_VERSION] = "PARQUET_3_0";
+    ParquetWriterBuilder builder(schema, /*batch_size=*/1024, options);
+    ASSERT_NOK_WITH_MSG(builder.PrepareWriterProperties("zstd"),
+                        "Unknown writer version PARQUET_3_0");
+}
+
+}  // namespace paimon::parquet::test


Reply via email to