This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 45d8609  feat: introduce data type with JSON serialization
45d8609 is described below

commit 45d8609d84a63814ece92055bb8f938a7d91d823
Author: lszskye <[email protected]>
AuthorDate: Mon May 25 19:23:27 2026 +0800

    feat: introduce data type with JSON serialization
---
 src/paimon/common/types/array_type.h               |  57 ++
 src/paimon/common/types/data_field.cpp             | 179 ++++++
 src/paimon/common/types/data_field.h               | 109 ++++
 src/paimon/common/types/data_field_test.cpp        | 313 ++++++++++
 src/paimon/common/types/data_type.cpp              | 134 ++++
 src/paimon/common/types/data_type.h                |  65 ++
 src/paimon/common/types/data_type_json_parser.cpp  | 685 +++++++++++++++++++++
 src/paimon/common/types/data_type_json_parser.h    |  59 ++
 .../common/types/data_type_json_parser_test.cpp    | 151 +++++
 src/paimon/common/types/data_type_test.cpp         | 101 +++
 src/paimon/common/types/map_type.h                 |  69 +++
 src/paimon/common/types/row_kind.cpp               |  46 ++
 src/paimon/common/types/row_kind.h                 | 149 +++++
 src/paimon/common/types/row_kind_test.cpp          |  46 ++
 src/paimon/common/types/row_type.cpp               |  86 +++
 src/paimon/common/types/row_type.h                 |  47 ++
 16 files changed, 2296 insertions(+)

diff --git a/src/paimon/common/types/array_type.h 
b/src/paimon/common/types/array_type.h
new file mode 100644
index 0000000..d03c0da
--- /dev/null
+++ b/src/paimon/common/types/array_type.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_type.h"
+#include "paimon/common/utils/rapidjson_util.h"
+
+namespace paimon {
+
+class ArrayType : public DataType {
+ public:
+    static constexpr char TYPE[] = "ARRAY";
+
+    ArrayType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+              const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
+        : DataType(type, nullable, metadata) {}
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override {
+        rapidjson::Value obj(rapidjson::kObjectType);
+        obj.AddMember(
+            rapidjson::StringRef("type"),
+            RapidJsonUtil::SerializeValue(WithNullable(std::string(TYPE)), 
allocator).Move(),
+            *allocator);
+        auto type = 
arrow::internal::checked_cast<arrow::ListType*>(type_.get());
+        auto value_field = type->value_field();
+
+        std::shared_ptr<DataType> data_type =
+            DataType::Create(value_field->type(), value_field->nullable(), 
/*metadata=*/nullptr);
+        obj.AddMember(rapidjson::StringRef("element"),
+                      RapidJsonUtil::SerializeValue(*data_type, 
allocator).Move(), *allocator);
+        return obj;
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_field.cpp 
b/src/paimon/common/types/data_field.cpp
new file mode 100644
index 0000000..4aa1092
--- /dev/null
+++ b/src/paimon/common/types/data_field.cpp
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_field.h"
+
+#include <cassert>
+#include <map>
+#include <stdexcept>
+#include <unordered_map>
+#include <utility>
+
+#include "arrow/api.h"
+#include "fmt/format.h"
+#include "paimon/common/types/data_type.h"
+#include "paimon/common/types/data_type_json_parser.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+bool DataField::operator==(const DataField& other) const {
+    return id_ == other.id_ && field_->Equals(other.field_, 
/*check_metadata=*/false) &&
+           description_ == other.description_;
+}
+
+rapidjson::Value DataField::ToJson(rapidjson::Document::AllocatorType* 
allocator) const
+    noexcept(false) {
+    rapidjson::Value obj(rapidjson::kObjectType);
+    obj.AddMember(rapidjson::StringRef("id"), 
RapidJsonUtil::SerializeValue(id_, allocator).Move(),
+                  *allocator);
+    obj.AddMember(rapidjson::StringRef("name"),
+                  RapidJsonUtil::SerializeValue(field_->name(), 
allocator).Move(), *allocator);
+    std::shared_ptr<DataType> data_type =
+        DataType::Create(field_->type(), field_->nullable(), 
field_->metadata());
+    obj.AddMember(rapidjson::StringRef("type"),
+                  RapidJsonUtil::SerializeValue(*data_type, allocator).Move(), 
*allocator);
+    if (description_ != std::nullopt) {
+        obj.AddMember(rapidjson::StringRef("description"),
+                      RapidJsonUtil::SerializeValue(description_.value(), 
allocator).Move(),
+                      *allocator);
+    }
+    return obj;
+}
+
+void DataField::FromJson(const rapidjson::Value& obj) noexcept(false) {
+    id_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, "id");
+    auto name = RapidJsonUtil::DeserializeKeyValue<std::string>(obj, "name");
+    assert(obj.IsObject());
+    if (!obj.HasMember("type")) {
+        throw std::invalid_argument("key 'type' must exist");
+    }
+    auto field_result = DataTypeJsonParser::ParseType(name, obj["type"]);
+    if (!field_result.ok()) {
+        throw std::invalid_argument(
+            fmt::format("parse data type failed, error msg: {}", 
field_result.status().ToString()));
+    }
+    field_ = field_result.value();
+    assert(field_);
+    description_ = 
RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(
+        obj, "description", description_);
+}
+
+std::shared_ptr<arrow::Field> DataField::ConvertDataFieldToArrowField(const 
DataField& field) {
+    std::unordered_map<std::string, std::string> metadata_map;
+    if (field.field_->HasMetadata()) {
+        field.field_->metadata()->ToUnorderedMap(&metadata_map);
+    }
+    metadata_map[std::string(DataField::FIELD_ID)] = 
std::to_string(field.Id());
+    if (field.Description() && !field.Description().value().empty()) {
+        metadata_map[DataField::DESCRIPTION] = field.Description().value();
+    }
+    auto metadata = std::make_shared<arrow::KeyValueMetadata>(metadata_map);
+    return std::make_shared<arrow::Field>(field.Name(), field.Type(), 
field.Nullable(), metadata);
+}
+
+std::shared_ptr<arrow::DataType> DataField::ConvertDataFieldsToArrowStructType(
+    const std::vector<DataField>& data_fields) {
+    arrow::FieldVector arrow_fields;
+    arrow_fields.reserve(data_fields.size());
+    for (const auto& field : data_fields) {
+        arrow_fields.push_back(ConvertDataFieldToArrowField(field));
+    }
+    return arrow::struct_(arrow_fields);
+}
+
+std::shared_ptr<arrow::Schema> DataField::ConvertDataFieldsToArrowSchema(
+    const std::vector<DataField>& data_fields) {
+    auto data_type = ConvertDataFieldsToArrowStructType(data_fields);
+    return arrow::schema(data_type->fields());
+}
+
+Result<std::vector<DataField>> DataField::ConvertArrowSchemaToDataFields(
+    const std::shared_ptr<arrow::Schema>& schema) {
+    std::vector<DataField> fields;
+    fields.reserve(schema->num_fields());
+    for (const auto& arrow_field : schema->fields()) {
+        PAIMON_ASSIGN_OR_RAISE(DataField field, 
ConvertArrowFieldToDataField(arrow_field));
+        fields.push_back(field);
+    }
+    return fields;
+}
+
+Result<DataField> DataField::ConvertArrowFieldToDataField(
+    const std::shared_ptr<arrow::Field>& field) {
+    if (!field->HasMetadata() || !field->metadata()) {
+        return Status::Invalid(fmt::format(
+            "invalid read schema, lack of metadata of field id, field name 
'{}'", field->name()));
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::string field_id_str,
+                                      
field->metadata()->Get(DataField::FIELD_ID));
+    std::optional<int32_t> field_id = 
StringUtils::StringToValue<int32_t>(field_id_str);
+    if (field_id == std::nullopt) {
+        return Status::Invalid(
+            fmt::format("invalid read schema, cannot cast field id {} to int, 
field name '{}'",
+                        field_id_str, field->name()));
+    }
+    std::optional<std::string> description;
+    auto description_result = field->metadata()->Get(DataField::DESCRIPTION);
+    if (description_result.ok()) {
+        description = description_result.ValueUnsafe();
+    }
+
+    return DataField(field_id.value(), field, description);
+}
+
+std::vector<int32_t> DataField::GetAllFieldIds(const std::vector<DataField>& 
fields) {
+    std::vector<int32_t> ids;
+    ids.reserve(fields.size());
+    for (const auto& field : fields) {
+        ids.push_back(field.Id());
+    }
+    return ids;
+}
+
+Result<std::vector<DataField>> DataField::ProjectFields(
+    const std::vector<DataField>& fields,
+    const std::optional<std::vector<std::string>>& projected_cols) {
+    std::vector<DataField> projected_fields;
+    if (projected_cols == std::nullopt) {
+        projected_fields = fields;
+    } else {
+        // field name to field idx
+        std::map<std::string, int32_t> field_idx_map = 
ObjectUtils::CreateIdentifierToIndexMap(
+            fields, [](const DataField& field) -> std::string { return 
field.Name(); });
+        for (const auto& projected_col : projected_cols.value()) {
+            auto iter = field_idx_map.find(projected_col);
+            if (iter == field_idx_map.end()) {
+                return Status::Invalid(
+                    fmt::format("projected field {} not in src field set", 
projected_col));
+            }
+            projected_fields.push_back(fields[iter->second]);
+        }
+    }
+    return projected_fields;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_field.h 
b/src/paimon/common/types/data_field.h
new file mode 100644
index 0000000..ada3ce9
--- /dev/null
+++ b/src/paimon/common/types/data_field.h
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/result.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+/// Defines the field of a row type.
+class DataField : public Jsonizable<DataField> {
+ public:
+    DataField(int32_t id, const std::shared_ptr<arrow::Field>& field)
+        : DataField(id, field, std::nullopt) {}
+
+    DataField(int32_t id, const std::shared_ptr<arrow::Field>& field,
+              const std::optional<std::string>& description)
+        : id_(id), field_(field), description_(description) {}
+
+    static constexpr char FIELD_ID[] = "paimon.id";
+    static constexpr char DESCRIPTION[] = "paimon.description";
+
+ public:
+    static std::shared_ptr<arrow::Field> ConvertDataFieldToArrowField(const 
DataField& field);
+
+    static std::shared_ptr<arrow::DataType> ConvertDataFieldsToArrowStructType(
+        const std::vector<DataField>& data_fields);
+
+    static std::shared_ptr<arrow::Schema> ConvertDataFieldsToArrowSchema(
+        const std::vector<DataField>& data_fields);
+
+    static Result<std::vector<DataField>> ConvertArrowSchemaToDataFields(
+        const std::shared_ptr<arrow::Schema>& schema);
+
+    static Result<DataField> ConvertArrowFieldToDataField(
+        const std::shared_ptr<arrow::Field>& field);
+
+    static std::vector<int32_t> GetAllFieldIds(const std::vector<DataField>& 
fields);
+
+    static Result<std::vector<DataField>> ProjectFields(
+        const std::vector<DataField>& fields,
+        const std::optional<std::vector<std::string>>& projected_cols);
+
+    int32_t Id() const {
+        return id_;
+    }
+
+    const std::string& Name() const {
+        return field_->name();
+    }
+
+    const std::shared_ptr<arrow::DataType>& Type() const {
+        return field_->type();
+    }
+
+    const std::shared_ptr<arrow::Field>& ArrowField() const {
+        return field_;
+    }
+
+    const std::optional<std::string>& Description() const {
+        return description_;
+    }
+
+    bool Nullable() const {
+        return field_->nullable();
+    }
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override;
+
+    void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+    bool operator==(const DataField& other) const;
+
+ private:
+    JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(DataField);
+
+ private:
+    int32_t id_ = -1;
+    std::shared_ptr<arrow::Field> field_;
+    std::optional<std::string> description_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_field_test.cpp 
b/src/paimon/common/types/data_field_test.cpp
new file mode 100644
index 0000000..6ac52ff
--- /dev/null
+++ b/src/paimon/common/types/data_field_test.cpp
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_field.h"
+
+#include <stdexcept>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon::test {
+
+class DataFieldTest : public ::testing::Test {
+ protected:
+    void SetUp() override {
+        auto arrow_field1 = arrow::field("field1", arrow::int32());
+        auto arrow_field2 = arrow::field("field2", arrow::utf8());
+
+        field1_ = DataField(1, arrow_field1, "description1");
+        field2_ = DataField(2, arrow_field2);
+
+        auto arrow_field3 = arrow::field("field3", 
arrow::struct_({arrow_field1, arrow_field2}));
+
+        field3_ = DataField(0, arrow_field3);
+    }
+
+    DataField field1_;
+    DataField field2_;
+    DataField field3_;
+};
+
+TEST_F(DataFieldTest, FieldAttributes) {
+    EXPECT_EQ(field1_.Id(), 1);
+    EXPECT_EQ(field1_.Name(), "field1");
+    EXPECT_EQ(field1_.Type()->id(), arrow::Type::INT32);
+    EXPECT_EQ(field1_.Description().value(), "description1");
+
+    EXPECT_EQ(field2_.Id(), 2);
+    EXPECT_EQ(field2_.Name(), "field2");
+    EXPECT_EQ(field2_.Type()->id(), arrow::Type::STRING);
+    EXPECT_EQ(field2_.Description(), std::nullopt);
+}
+
+TEST_F(DataFieldTest, EqualityOperator) {
+    DataField field1_copy = field1_;
+    EXPECT_TRUE(field1_ == field1_copy);
+    EXPECT_FALSE(field1_ == field2_);
+}
+
+TEST_F(DataFieldTest, ConvertDataFieldToArrowField) {
+    auto arrow_field = DataField::ConvertDataFieldToArrowField(field1_);
+    EXPECT_EQ(arrow_field->name(), "field1");
+    EXPECT_EQ(arrow_field->type()->id(), arrow::Type::INT32);
+    EXPECT_TRUE(arrow_field->nullable());
+    EXPECT_EQ(arrow_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), 
"1");
+}
+
+TEST_F(DataFieldTest, ConvertDataFieldsToArrowStructType) {
+    std::vector<DataField> data_fields = {field1_, field2_};
+    auto arrow_struct_type = 
DataField::ConvertDataFieldsToArrowStructType(data_fields);
+    EXPECT_EQ(arrow_struct_type->num_fields(), 2);
+    EXPECT_EQ(arrow_struct_type->field(0)->name(), "field1");
+    EXPECT_EQ(arrow_struct_type->field(1)->name(), "field2");
+}
+
+TEST_F(DataFieldTest, ConvertDataFieldsToArrowSchema) {
+    std::vector<DataField> data_fields = {field1_, field2_};
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+    EXPECT_EQ(arrow_schema->num_fields(), 2);
+    EXPECT_EQ(arrow_schema->field(0)->name(), "field1");
+    EXPECT_EQ(arrow_schema->field(1)->name(), "field2");
+}
+
+TEST_F(DataFieldTest, ConvertArrowSchemaToDataFields) {
+    std::vector<DataField> data_fields = {field1_, field2_};
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+    ASSERT_OK_AND_ASSIGN(auto converted_data_fields,
+                         
DataField::ConvertArrowSchemaToDataFields(arrow_schema));
+    EXPECT_EQ(converted_data_fields.size(), 2);
+    EXPECT_EQ(converted_data_fields[0], field1_);
+    EXPECT_EQ(converted_data_fields[1], field2_);
+}
+
+TEST_F(DataFieldTest, GetAllFieldIds) {
+    std::vector<DataField> data_fields = {field1_, field2_, field3_};
+    auto fields_ids = DataField::GetAllFieldIds(data_fields);
+    ASSERT_EQ(fields_ids, std::vector<int32_t>({field1_.Id(), field2_.Id(), 
field3_.Id()}));
+}
+
+TEST_F(DataFieldTest, ConvertArrowFieldToDataField) {
+    {
+        auto arrow_field = DataField::ConvertDataFieldToArrowField(field1_);
+        ASSERT_OK_AND_ASSIGN(auto converted_data_field,
+                             
DataField::ConvertArrowFieldToDataField(arrow_field));
+        ASSERT_EQ(converted_data_field, field1_);
+    }
+    {
+        std::vector<std::string> keys = {"invalid_field_id"};
+        std::vector<std::string> values = {"1"};
+        std::shared_ptr<arrow::KeyValueMetadata> meta = 
arrow::KeyValueMetadata::Make(keys, values);
+        auto arrow_field = arrow::field("field1", 
arrow::int32())->WithMetadata(meta);
+        
ASSERT_NOK_WITH_MSG(DataField::ConvertArrowFieldToDataField(arrow_field),
+                            "Key error: paimon.id");
+    }
+    {
+        std::vector<std::string> keys = {std::string(DataField::FIELD_ID)};
+        std::vector<std::string> values = {"1--"};
+        std::shared_ptr<arrow::KeyValueMetadata> meta = 
arrow::KeyValueMetadata::Make(keys, values);
+        auto arrow_field = arrow::field("field1", 
arrow::int32())->WithMetadata(meta);
+        
ASSERT_NOK_WITH_MSG(DataField::ConvertArrowFieldToDataField(arrow_field),
+                            "invalid read schema, cannot cast field id 1-- to 
int");
+    }
+}
+
+TEST_F(DataFieldTest, FromJson) {
+    const char* json = R"({
+    "id" : 0,
+    "name" : "f0",
+    "type" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "id" : 1,
+        "name" : "sub1",
+        "type" : "DATE"
+       }, {
+        "id" : 4,
+        "name" : "sub4",
+        "type" : "BYTES"
+       },
+       {
+        "id" : 5,
+        "name" : "sub5",
+        "type" : "BLOB"
+       }]
+    }
+})";
+    rapidjson::Document doc;
+    doc.Parse(json);
+    DataField field;
+    field.FromJson(doc);
+    EXPECT_EQ(field.Id(), 0);
+    EXPECT_EQ(field.Name(), "f0");
+    EXPECT_EQ(field.Type()->id(), arrow::Type::STRUCT);
+
+    auto sub_fields = field.Type()->fields();
+    ASSERT_EQ(sub_fields.size(), 3);
+    ASSERT_OK_AND_ASSIGN(auto sub1, 
DataField::ConvertArrowFieldToDataField(sub_fields[0]));
+    ASSERT_EQ(sub1, DataField(1, arrow::field("sub1", arrow::date32())));
+    ASSERT_OK_AND_ASSIGN(auto sub4, 
DataField::ConvertArrowFieldToDataField(sub_fields[1]));
+    ASSERT_EQ(sub4, DataField(4, arrow::field("sub4", arrow::binary())));
+    ASSERT_OK_AND_ASSIGN(auto sub5, 
DataField::ConvertArrowFieldToDataField(sub_fields[2]));
+    ASSERT_TRUE(BlobUtils::IsBlobField(sub5.ArrowField()));
+}
+
+TEST_F(DataFieldTest, FromJsonFailed) {
+    auto check_result = [&](const std::string& json_str, const std::string& 
error_message) {
+        try {
+            rapidjson::Document doc;
+            doc.Parse(json_str);
+            DataField field;
+            field.FromJson(doc);
+            FAIL() << "Expected std::invalid_argument";
+        } catch (const std::invalid_argument& e) {
+            // Validate the exception type and message
+            std::string msg(e.what());
+            ASSERT_TRUE(msg.find(error_message) != std::string::npos) << 
e.what();
+        } catch (...) {
+            // Handle unexpected exception types
+            ASSERT_TRUE(false);
+        }
+    };
+    {
+        std::string json_str = R"({
+    "id" : 0,
+    "name" : "f0",
+    "type" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "id" : 1,
+        "name" : "sub1"
+       }, {
+        "id" : 4,
+        "name" : "sub4",
+        "type" : "BYTES"
+       }]}
+})";
+        check_result(json_str, "key 'type' must exist");
+    }
+    {
+        std::string json_str = R"({
+    "id" : 0,
+    "name" : "f0",
+    "type" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "id" : 1,
+        "name" : "sub1",
+        "type" : "EMPTY_BYTES"
+       }, {
+        "id" : 4,
+        "name" : "sub4",
+        "type" : "BYTES"
+       }]}
+})";
+        check_result(json_str, "parse data type failed, error msg: ");
+    }
+}
+
+TEST_F(DataFieldTest, ToJson) {
+    std::string expected_field1_json = R"({
+    "id": 1,
+    "name": "field1",
+    "type": "INT",
+    "description": "description1"
+})";
+
+    ASSERT_OK_AND_ASSIGN(std::string actual_field1_json, 
field1_.ToJsonString());
+    ASSERT_EQ(expected_field1_json, actual_field1_json);
+
+    std::string expected_field2_json = R"({
+    "id": 2,
+    "name": "field2",
+    "type": "STRING"
+})";
+
+    ASSERT_OK_AND_ASSIGN(std::string actual_field2_json, 
field2_.ToJsonString());
+    ASSERT_EQ(expected_field2_json, actual_field2_json);
+
+    std::string expected_field3_json = R"({
+    "id": 0,
+    "name": "field3",
+    "type": {
+        "type": "ROW",
+        "fields": [
+            {
+                "id": -1,
+                "name": "field1",
+                "type": "INT"
+            },
+            {
+                "id": -1,
+                "name": "field2",
+                "type": "STRING"
+            }
+        ]
+    }
+})";
+    ASSERT_OK_AND_ASSIGN(std::string actual_field3_json, 
field3_.ToJsonString());
+    ASSERT_EQ(expected_field3_json, actual_field3_json);
+}
+
+TEST_F(DataFieldTest, TestProjectFields) {
+    std::vector<DataField> fields = {
+        DataField(0, arrow::field("f0", arrow::boolean())),
+        DataField(1, arrow::field("f1", arrow::int8())),
+        DataField(2, arrow::field("f2", arrow::int16())),
+        DataField(3, arrow::field("f3", arrow::int32())),
+        DataField(4, arrow::field("f4", arrow::int64())),
+        DataField(5, arrow::field("f5", arrow::float32())),
+        DataField(6, arrow::field("f6", arrow::float64())),
+        DataField(7, arrow::field("f7", arrow::utf8())),
+        DataField(8, arrow::field("f8", arrow::binary())),
+        DataField(9, arrow::field("f9", 
arrow::timestamp(arrow::TimeUnit::NANO))),
+        DataField(10, arrow::field("f10", arrow::date32())),
+        DataField(11, arrow::field("f11", arrow::decimal128(2, 2))),
+    };
+    {
+        ASSERT_OK_AND_ASSIGN(std::vector<DataField> projected_fields,
+                             DataField::ProjectFields(fields, std::nullopt));
+        ASSERT_EQ(projected_fields, fields);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            std::vector<DataField> projected_fields,
+            DataField::ProjectFields(fields, std::vector<std::string>({"f0", 
"f2", "f10", "f4"})));
+        ASSERT_EQ(projected_fields,
+                  std::vector<DataField>({fields[0], fields[2], fields[10], 
fields[4]}));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::vector<DataField> projected_fields,
+                             DataField::ProjectFields(fields, 
std::vector<std::string>({})));
+        ASSERT_TRUE(projected_fields.empty());
+    }
+    {
+        ASSERT_NOK_WITH_MSG(
+            DataField::ProjectFields(fields, std::vector<std::string>({"f0", 
"non-exist"})),
+            "projected field non-exist not in src field set");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/types/data_type.cpp 
b/src/paimon/common/types/data_type.cpp
new file mode 100644
index 0000000..7b71f53
--- /dev/null
+++ b/src/paimon/common/types/data_type.cpp
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_type.h"
+
+#include <cstdint>
+#include <stdexcept>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/types/array_type.h"
+#include "paimon/common/types/map_type.h"
+#include "paimon/common/types/row_type.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+DataType::DataType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+                   const std::shared_ptr<const arrow::KeyValueMetadata>& 
metadata)
+    : type_(type), nullable_(nullable), metadata_(metadata) {}
+
+std::unique_ptr<DataType> DataType::Create(
+    const std::shared_ptr<arrow::DataType>& type, bool nullable,
+    const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) {
+    switch (type->id()) {
+        case arrow::Type::type::MAP:
+            return std::make_unique<MapType>(type, nullable, metadata);
+        case arrow::Type::type::LIST:
+            return std::make_unique<ArrayType>(type, nullable, metadata);
+        case arrow::Type::type::STRUCT:
+            return std::make_unique<RowType>(type, nullable, metadata);
+        default:
+            return std::unique_ptr<DataType>(new DataType(type, nullable, 
metadata));
+    }
+}
+
+std::string DataType::WithNullable(const std::string& type) const {
+    if (!nullable_) {
+        return type + " NOT NULL";
+    }
+    return type;
+}
+
+rapidjson::Value DataType::ToJson(rapidjson::Document::AllocatorType* 
allocator) const {
+    return 
RapidJsonUtil::SerializeValue(WithNullable(DataTypeToString(type_)), allocator);
+}
+
+void DataType::FromJson(const rapidjson::Value& obj) noexcept(false) {
+    throw std::logic_error("NotImplemented: DataType::FromJson");
+}
+
+std::string DataType::TimestampToString(const 
std::shared_ptr<arrow::TimestampType>& type) const {
+    auto precision = DateTimeUtils::GetPrecisionFromType(type);
+    if (type->timezone().empty()) {
+        return fmt::format("TIMESTAMP({})", precision);
+    }
+    return fmt::format("TIMESTAMP({}) WITH LOCAL TIME ZONE", precision);
+}
+
+std::string DataType::DataTypeToString(const std::shared_ptr<arrow::DataType>& 
type) const {
+    switch (type->id()) {
+        case arrow::Type::type::BOOL:
+            return "BOOLEAN";
+        case arrow::Type::type::INT8:
+            return "TINYINT";
+        case arrow::Type::type::INT16:
+            return "SMALLINT";
+        case arrow::Type::type::INT32:
+            return "INT";
+        case arrow::Type::type::INT64:
+            return "BIGINT";
+        case arrow::Type::type::FLOAT:
+            return "FLOAT";
+        case arrow::Type::type::DOUBLE:
+            return "DOUBLE";
+        case arrow::Type::type::STRING:
+            return "STRING";
+        case arrow::Type::type::BINARY:
+            return "BYTES";
+        case arrow::Type::type::DATE32:
+            return "DATE";
+        case arrow::Type::type::DECIMAL128: {
+            auto status = DecimalUtils::CheckDecimalType(*type);
+            if (!status.ok()) {
+                throw std::invalid_argument(status.ToString());
+            }
+            const uint64_t precision = static_cast<uint64_t>(
+                
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(type)->precision());
+            const uint64_t scale = static_cast<uint64_t>(
+                
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(type)->scale());
+            return fmt::format("DECIMAL({}, {})", precision, scale);
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            const auto& timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(type);
+            return TimestampToString(timestamp_type);
+        }
+        case arrow::Type::type::LARGE_BINARY: {
+            // TODO(xinyu): change binary to large binary?
+            if (BlobUtils::IsBlobMetadata(metadata_)) {
+                return "BLOB";
+            }
+            [[fallthrough]];
+        }
+        default:
+            throw std::invalid_argument(fmt::format("unknown type {}", 
type->ToString()));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_type.h 
b/src/paimon/common/types/data_type.h
new file mode 100644
index 0000000..173960b
--- /dev/null
+++ b/src/paimon/common/types/data_type.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/common/utils/jsonizable.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace arrow {
+class DataType;
+class TimestampType;
+class KeyValueMetadata;
+}  // namespace arrow
+
+namespace paimon {
+
+class DataType : public Jsonizable<DataType> {
+ public:
+    static std::unique_ptr<DataType> Create(
+        const std::shared_ptr<arrow::DataType>& type, bool nullable,
+        const std::shared_ptr<const arrow::KeyValueMetadata>& metadata);
+
+    ~DataType() override = default;
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override;
+    void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+    std::string WithNullable(const std::string& type) const;
+
+ protected:
+    DataType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+             const std::shared_ptr<const arrow::KeyValueMetadata>& metadata);
+
+    std::shared_ptr<arrow::DataType> type_;
+    bool nullable_;
+    std::shared_ptr<const arrow::KeyValueMetadata> metadata_;
+
+ private:
+    std::string TimestampToString(const std::shared_ptr<arrow::TimestampType>& 
type) const;
+    std::string DataTypeToString(const std::shared_ptr<arrow::DataType>& type) 
const;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_type_json_parser.cpp 
b/src/paimon/common/types/data_type_json_parser.cpp
new file mode 100644
index 0000000..f4ab231
--- /dev/null
+++ b/src/paimon/common/types/data_type_json_parser.cpp
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_type_json_parser.h"
+
+#include <algorithm>
+#include <cctype>
+#include <cstddef>
+#include <cstdint>
+#include <map>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+
+namespace paimon {
+namespace {
+static constexpr char CHAR_BEGIN_SUBTYPE = '<';
+static constexpr char CHAR_END_SUBTYPE = '>';
+static constexpr char CHAR_BEGIN_PARAMETER = '(';
+static constexpr char CHAR_END_PARAMETER = ')';
+static constexpr char CHAR_LIST_SEPARATOR = ',';
+static constexpr char CHAR_STRING = '\'';
+static constexpr char CHAR_IDENTIFIER = '`';
+static constexpr char CHAR_DOT = '.';
+
+enum class TokenType : int32_t {
+    // e.g. "ROW<"
+    BEGIN_SUBTYPE = 1,
+    // e.g. "ROW<..>"
+    END_SUBTYPE,
+    // e.g. "CHAR("
+    BEGIN_PARAMETER,
+    // e.g. "CHAR(...)"
+    END_PARAMETER,
+    // e.g. "ROW<INT,"
+    LIST_SEPARATOR,
+    // e.g. "ROW<name INT 'Comment'"
+    LITERAL_STRING,
+    // CHAR(12
+    LITERAL_INT,
+    // e.g. "CHAR" or "TO"
+    KEYWORD,
+    // e.g. "ROW<name" or "myCatalog.myDatabase"
+    IDENTIFIER,
+    // e.g. "myCatalog.myDatabase."
+    IDENTIFIER_SEPARATOR
+};
+
+struct Token {
+    Token(const TokenType& _type, int32_t _cursor_position, const std::string& 
_value)
+        : type(_type), cursor_position(_cursor_position), value(_value) {}
+    TokenType type;
+    int32_t cursor_position;
+    std::string value;
+};
+
+// nullptr is returned in the case of parsing failed
+Result<std::shared_ptr<arrow::DataType>> ParseAtomicType(const std::string& 
str, bool* nullable,
+                                                         bool* is_blob);
+std::vector<Token> Tokenize(const std::string& chars);
+bool IsWhitespace(char character);
+bool IsDelimiter(char character);
+bool IsDigit(char c) {
+    return c >= '0' && c <= '9';
+}
+int32_t ConsumeEscaped(const std::string& chars, int32_t cursor, char 
delimiter,
+                       std::ostringstream& builder);
+int32_t ConsumeInt(const std::string& chars, int32_t cursor, 
std::ostringstream& builder);
+int32_t ConsumeIdentifier(const std::string& chars, int32_t cursor, 
std::ostringstream& builder);
+
+// noted that keyword appears in Keyword is supposed to appear in KEYWORDS
+enum class Keyword : int32_t {
+    INVALID = 0,
+    CHAR,
+    VARCHAR,
+    STRING,
+    BOOLEAN,
+    BINARY,
+    VARBINARY,
+    BYTES,
+    DECIMAL,
+    NUMERIC,
+    DEC,
+    TINYINT,
+    SMALLINT,
+    INT,
+    INTEGER,
+    BIGINT,
+    FLOAT,
+    DOUBLE,
+    PRECISION,
+    DATE,
+    TIME,
+    WITH,
+    WITHOUT,
+    LOCAL,
+    ZONE,
+    TIMESTAMP,
+    TIMESTAMP_LTZ,
+    INTERVAL,
+    YEAR,
+    MONTH,
+    DAY,
+    HOUR,
+    MINUTE,
+    SECOND,
+    TO,
+    ARRAY,
+    MULTISET,
+    MAP,
+    ROW,
+    BLOB,
+    // NULL is keyword in c++
+    NULL_,
+    RAW,
+    LEGACY,
+    NOT
+};
+
+const std::map<std::string, Keyword>& Keywords() {
+    static const std::map<std::string, Keyword> kKeywords = {
+        {"CHAR", Keyword::CHAR},
+        {"VARCHAR", Keyword::VARCHAR},
+        {"STRING", Keyword::STRING},
+        {"BOOLEAN", Keyword::BOOLEAN},
+        {"BINARY", Keyword::BINARY},
+        {"VARBINARY", Keyword::VARBINARY},
+        {"BYTES", Keyword::BYTES},
+        {"DECIMAL", Keyword::DECIMAL},
+        {"NUMERIC", Keyword::NUMERIC},
+        {"DEC", Keyword::DEC},
+        {"TINYINT", Keyword::TINYINT},
+        {"SMALLINT", Keyword::SMALLINT},
+        {"INT", Keyword::INT},
+        {"INTEGER", Keyword::INTEGER},
+        {"BIGINT", Keyword::BIGINT},
+        {"FLOAT", Keyword::FLOAT},
+        {"DOUBLE", Keyword::DOUBLE},
+        {"PRECISION", Keyword::PRECISION},
+        {"DATE", Keyword::DATE},
+        {"TIME", Keyword::TIME},
+        {"WITH", Keyword::WITH},
+        {"WITHOUT", Keyword::WITHOUT},
+        {"LOCAL", Keyword::LOCAL},
+        {"ZONE", Keyword::ZONE},
+        {"TIMESTAMP", Keyword::TIMESTAMP},
+        {"TIMESTAMP_LTZ", Keyword::TIMESTAMP_LTZ},
+        {"INTERVAL", Keyword::INTERVAL},
+        {"YEAR", Keyword::YEAR},
+        {"MONTH", Keyword::MONTH},
+        {"DAY", Keyword::DAY},
+        {"HOUR", Keyword::HOUR},
+        {"MINUTE", Keyword::MINUTE},
+        {"SECOND", Keyword::SECOND},
+        {"TO", Keyword::TO},
+        {"ARRAY", Keyword::ARRAY},
+        {"MULTISET", Keyword::MULTISET},
+        {"MAP", Keyword::MAP},
+        {"ROW", Keyword::ROW},
+        {"BLOB", Keyword::BLOB},
+        {"NULL", Keyword::NULL_},
+        {"RAW", Keyword::RAW},
+        {"LEGACY", Keyword::LEGACY},
+        {"NOT", Keyword::NOT}};
+    return kKeywords;
+}
+
+class TokenParser {
+ public:
+    TokenParser(const std::string& input_string, const std::vector<Token>& 
tokens)
+        : input_string_(input_string), tokens_(tokens) {}
+
+    Result<std::shared_ptr<arrow::DataType>> ParseTokens(bool* nullable, bool* 
is_blob);
+
+ private:
+    inline const Token& GetToken() const {
+        return tokens_[current_token_];
+    }
+    int32_t TokenAsInt() const {
+        return std::stoi(GetToken().value);
+    }
+    Keyword TokenAsKeyword() const {
+        return TokenAsKeyword(GetToken());
+    }
+    Keyword TokenAsKeyword(const Token& token) const {
+        auto iter = Keywords().find(token.value);
+        if (iter != Keywords().end()) {
+            return iter->second;
+        }
+        return Keyword::INVALID;
+    }
+    bool HasRemainingTokens() const {
+        return current_token_ + 1 < static_cast<int32_t>(tokens_.size());
+    }
+
+    Status NextToken();
+    Status NextToken(TokenType type);
+    Status NextToken(Keyword keyword);
+    bool HasNextToken(const std::vector<TokenType>& types) const;
+    bool HasNextToken(const std::vector<Keyword>& keywords) const;
+    Result<bool> ParseNullability();
+    Result<std::shared_ptr<arrow::DataType>> ParseTypeWithNullability(bool* 
nullable,
+                                                                      bool* 
is_blob);
+    Result<std::shared_ptr<arrow::DataType>> ParseTypeByKeyword(bool* is_blob);
+    Result<int32_t> ParseStringLength();
+    template <typename T>
+    Result<std::shared_ptr<arrow::DataType>> ParseStringType();
+    Result<std::shared_ptr<arrow::DataType>> ParseDecimalType();
+    Result<std::shared_ptr<arrow::DataType>> ParseDoubleType();
+    Result<std::shared_ptr<arrow::DataType>> ParseTimestampType();
+    Result<std::shared_ptr<arrow::DataType>> ParseTimestampLtzType();
+    Result<int32_t> ParseOptionalPrecision(int32_t default_precision);
+
+ private:
+    std::string input_string_;
+    std::vector<Token> tokens_;
+    int32_t last_valid_token_ = -1;
+    int32_t current_token_ = -1;
+};
+
+Result<std::shared_ptr<arrow::DataType>> ParseAtomicType(const std::string& 
str, bool* nullable,
+                                                         bool* is_blob) {
+    try {
+        std::vector<Token> tokens = Tokenize(str);
+        TokenParser converter(str, tokens);
+        return converter.ParseTokens(nullable, is_blob);
+    } catch (...) {
+        return Status::Invalid("parse atomic type failed.");
+    }
+}
+
+std::vector<Token> Tokenize(const std::string& chars) {
+    std::vector<Token> tokens;
+    std::ostringstream builder;
+    for (size_t cursor = 0; cursor < chars.length(); cursor++) {
+        const auto& cur_char = chars[cursor];
+        switch (cur_char) {
+            case CHAR_BEGIN_SUBTYPE:
+                tokens.emplace_back(TokenType::BEGIN_SUBTYPE, cursor,
+                                    std::to_string(CHAR_BEGIN_SUBTYPE));
+                break;
+            case CHAR_END_SUBTYPE:
+                tokens.emplace_back(TokenType::END_SUBTYPE, cursor,
+                                    std::to_string(CHAR_END_SUBTYPE));
+                break;
+            case CHAR_BEGIN_PARAMETER:
+                tokens.emplace_back(TokenType::BEGIN_PARAMETER, cursor,
+                                    std::to_string(CHAR_BEGIN_PARAMETER));
+                break;
+            case CHAR_END_PARAMETER:
+                tokens.emplace_back(TokenType::END_PARAMETER, cursor,
+                                    std::to_string(CHAR_END_PARAMETER));
+                break;
+            case CHAR_LIST_SEPARATOR:
+                tokens.emplace_back(TokenType::LIST_SEPARATOR, cursor,
+                                    std::to_string(CHAR_LIST_SEPARATOR));
+                break;
+            case CHAR_DOT:
+                tokens.emplace_back(TokenType::IDENTIFIER_SEPARATOR, cursor,
+                                    std::to_string(CHAR_DOT));
+                break;
+            case CHAR_STRING:
+                builder.str("");
+                builder.clear();
+                cursor = ConsumeEscaped(chars, cursor, CHAR_STRING, builder);
+                tokens.emplace_back(TokenType::LITERAL_STRING, cursor, 
builder.str());
+                break;
+            case CHAR_IDENTIFIER:
+                builder.str("");
+                builder.clear();
+                cursor = ConsumeEscaped(chars, cursor, CHAR_IDENTIFIER, 
builder);
+                tokens.emplace_back(TokenType::IDENTIFIER, cursor, 
builder.str());
+                break;
+            default:
+                if (IsWhitespace(cur_char)) {
+                    continue;
+                }
+                if (IsDigit(cur_char)) {
+                    builder.str("");
+                    builder.clear();
+                    cursor = ConsumeInt(chars, cursor, builder);
+                    tokens.emplace_back(TokenType::LITERAL_INT, cursor, 
builder.str());
+                    break;
+                }
+                builder.str("");
+                builder.clear();
+                cursor = ConsumeIdentifier(chars, cursor, builder);
+                auto token = builder.str();
+                auto normalized_token = token;
+                std::transform(normalized_token.begin(), 
normalized_token.end(),
+                               normalized_token.begin(),
+                               [](unsigned char c) { return std::toupper(c); 
});
+                if (Keywords().find(normalized_token) != Keywords().end()) {
+                    tokens.emplace_back(TokenType::KEYWORD, cursor, 
normalized_token);
+                } else {
+                    tokens.emplace_back(TokenType::IDENTIFIER, cursor, token);
+                }
+        }
+    }
+    return tokens;
+}
+
+bool IsWhitespace(char character) {
+    return std::isspace(static_cast<unsigned char>(character));
+}
+
+bool IsDelimiter(char character) {
+    return IsWhitespace(character) || character == CHAR_BEGIN_SUBTYPE ||
+           character == CHAR_END_SUBTYPE || character == CHAR_BEGIN_PARAMETER 
||
+           character == CHAR_END_PARAMETER || character == CHAR_LIST_SEPARATOR 
||
+           character == CHAR_DOT;
+}
+
+int32_t ConsumeEscaped(const std::string& chars, int32_t cursor, char 
delimiter,
+                       std::ostringstream& builder) {
+    // skip delimiter
+    cursor++;
+    for (; cursor < static_cast<int32_t>(chars.length()); cursor++) {
+        const char& cur_char = chars[cursor];
+        if (cur_char == delimiter && cursor + 1 < 
static_cast<int32_t>(chars.length()) &&
+            chars[cursor + 1] == delimiter) {
+            // escaping of the escaping char e.g. "'Hello '' World'"
+            cursor++;
+            builder << cur_char;
+        } else if (cur_char == delimiter) {
+            break;
+        } else {
+            builder << cur_char;
+        }
+    }
+    return cursor;
+}
+
+int32_t ConsumeInt(const std::string& chars, int32_t cursor, 
std::ostringstream& builder) {
+    for (; cursor < static_cast<int32_t>(chars.length()) && 
IsDigit(chars[cursor]); cursor++) {
+        builder << chars[cursor];
+    }
+    return cursor - 1;
+}
+
+int32_t ConsumeIdentifier(const std::string& chars, int32_t cursor, 
std::ostringstream& builder) {
+    for (; cursor < static_cast<int32_t>(chars.length()) && 
!IsDelimiter(chars[cursor]); cursor++) {
+        builder << chars[cursor];
+    }
+    return cursor - 1;
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseTokens(bool* 
nullable, bool* is_blob) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> type,
+                           ParseTypeWithNullability(nullable, is_blob));
+    if (HasRemainingTokens()) {
+        PAIMON_RETURN_NOT_OK(NextToken());
+        return Status::Invalid(fmt::format("Unexpected token: {}", 
GetToken().value));
+    }
+    return type;
+}
+
+Status TokenParser::NextToken() {
+    current_token_++;
+    if (current_token_ >= static_cast<int32_t>(tokens_.size())) {
+        return Status::Invalid("Unexpected end.");
+    }
+    last_valid_token_ = current_token_ - 1;
+    return Status::OK();
+}
+
+Status TokenParser::NextToken(TokenType type) {
+    PAIMON_RETURN_NOT_OK(NextToken());
+    const auto& token = GetToken();
+    if (token.type != type) {
+        return Status::Invalid(fmt::format("< {} > expected but was < {} >.",
+                                           static_cast<int32_t>(type),
+                                           static_cast<int32_t>(token.type)));
+    }
+    return Status::OK();
+}
+
+Status TokenParser::NextToken(Keyword keyword) {
+    PAIMON_RETURN_NOT_OK(NextToken(TokenType::KEYWORD));
+    const auto& token = GetToken();
+    if (Keywords().find(token.value) == Keywords().end() || keyword != 
TokenAsKeyword(token)) {
+        return Status::Invalid(fmt::format("Keyword '{}' expected but was 
'{}'.",
+                                           static_cast<int32_t>(keyword), 
token.value));
+    }
+    return Status::OK();
+}
+
+bool TokenParser::HasNextToken(const std::vector<TokenType>& types) const {
+    if (current_token_ + types.size() + 1 > tokens_.size()) {
+        return false;
+    }
+    for (size_t i = 0; i < types.size(); i++) {
+        const auto& look_ahead = tokens_[current_token_ + i + 1];
+        if (look_ahead.type != types[i]) {
+            return false;
+        }
+    }
+    return true;
+}
+
+bool TokenParser::HasNextToken(const std::vector<Keyword>& keywords) const {
+    if (current_token_ + keywords.size() + 1 > tokens_.size()) {
+        return false;
+    }
+    for (size_t i = 0; i < keywords.size(); i++) {
+        const auto& look_ahead = tokens_[current_token_ + i + 1];
+        if (look_ahead.type != TokenType::KEYWORD || keywords[i] != 
TokenAsKeyword(look_ahead)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+Result<bool> TokenParser::ParseNullability() {
+    // "NOT NULL"
+    if (HasNextToken({Keyword::NOT, Keyword::NULL_})) {
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::NOT));
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::NULL_));
+        return false;
+    } else if (HasNextToken({Keyword::NULL_})) {
+        // explicit "NULL"
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::NULL_));
+        return true;
+    }
+    // implicit "NULL"
+    return true;
+}
+
+Result<std::shared_ptr<arrow::DataType>> 
TokenParser::ParseTypeWithNullability(bool* nullable,
+                                                                               
bool* is_blob) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> data_type, 
ParseTypeByKeyword(is_blob));
+    PAIMON_ASSIGN_OR_RAISE(*nullable, ParseNullability());
+    // special case: suffix notation for ARRAY types
+    if (HasNextToken({Keyword::ARRAY}) || HasNextToken({Keyword::MULTISET})) {
+        return Status::NotImplemented("not support for old version schema");
+    }
+    return data_type;
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseTypeByKeyword(bool* 
is_blob) {
+    PAIMON_RETURN_NOT_OK(NextToken(TokenType::KEYWORD));
+    switch (TokenAsKeyword()) {
+        case Keyword::BYTES:
+            return arrow::binary();
+        case Keyword::BLOB: {
+            *is_blob = true;
+            return arrow::large_binary();
+        }
+        case Keyword::STRING:
+            return arrow::utf8();
+        case Keyword::BOOLEAN:
+            return arrow::boolean();
+        case Keyword::DECIMAL:
+        case Keyword::NUMERIC:
+        case Keyword::DEC:
+            return ParseDecimalType();
+        case Keyword::TINYINT:
+            return arrow::int8();
+        case Keyword::SMALLINT:
+            return arrow::int16();
+        case Keyword::INT:
+        case Keyword::INTEGER:
+            return arrow::int32();
+        case Keyword::BIGINT:
+            return arrow::int64();
+        case Keyword::FLOAT:
+            return arrow::float32();
+        case Keyword::DOUBLE:
+            return ParseDoubleType();
+        case Keyword::DATE:
+            return arrow::date32();
+        case Keyword::TIMESTAMP:
+            return ParseTimestampType();
+        case Keyword::TIMESTAMP_LTZ:
+            return ParseTimestampLtzType();
+        default:
+            return Status::Invalid(fmt::format("Unsupported type: {}", 
GetToken().value));
+    }
+}
+
+Result<int32_t> TokenParser::ParseStringLength() {
+    // explicit length
+    if (HasNextToken({TokenType::BEGIN_PARAMETER})) {
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::BEGIN_PARAMETER));
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::LITERAL_INT));
+        auto length = TokenAsInt();
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::END_PARAMETER));
+        return length;
+    }
+    // implicit length
+    return -1;
+}
+
+template <typename T>
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseStringType() {
+    PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] int32_t length, 
ParseStringLength());
+    return std::make_shared<T>();
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseDecimalType() {
+    int32_t precision = Decimal::DEFAULT_PRECISION;
+    int32_t scale = Decimal::DEFAULT_SCALE;
+    if (HasNextToken({TokenType::BEGIN_PARAMETER})) {
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::BEGIN_PARAMETER));
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::LITERAL_INT));
+        precision = TokenAsInt();
+        if (HasNextToken({TokenType::LIST_SEPARATOR})) {
+            PAIMON_RETURN_NOT_OK(NextToken(TokenType::LIST_SEPARATOR));
+            PAIMON_RETURN_NOT_OK(NextToken(TokenType::LITERAL_INT));
+            scale = TokenAsInt();
+        }
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::END_PARAMETER));
+    }
+    return arrow::decimal128(precision, scale);
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseDoubleType() {
+    if (HasNextToken({Keyword::PRECISION})) {
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::PRECISION));
+    }
+    return arrow::float64();
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseTimestampType() {
+    PAIMON_ASSIGN_OR_RAISE(int32_t precision, 
ParseOptionalPrecision(Timestamp::DEFAULT_PRECISION));
+    bool with_timezone = false;
+    if (HasNextToken({Keyword::WITHOUT})) {
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::WITHOUT));
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::TIME));
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::ZONE));
+    } else if (HasNextToken({Keyword::WITH})) {
+        PAIMON_RETURN_NOT_OK(NextToken(Keyword::WITH));
+        if (HasNextToken({Keyword::LOCAL})) {
+            PAIMON_RETURN_NOT_OK(NextToken(Keyword::LOCAL));
+            PAIMON_RETURN_NOT_OK(NextToken(Keyword::TIME));
+            PAIMON_RETURN_NOT_OK(NextToken(Keyword::ZONE));
+            with_timezone = true;
+        }
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> ts_type,
+                           DateTimeUtils::GetTypeFromPrecision(precision, 
with_timezone));
+    return ts_type;
+}
+
+Result<std::shared_ptr<arrow::DataType>> TokenParser::ParseTimestampLtzType() {
+    PAIMON_ASSIGN_OR_RAISE(int32_t precision, 
ParseOptionalPrecision(Timestamp::DEFAULT_PRECISION));
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> ts_type,
+                           DateTimeUtils::GetTypeFromPrecision(precision, 
/*with_timezone=*/true));
+    return ts_type;
+}
+
+Result<int32_t> TokenParser::ParseOptionalPrecision(int32_t default_precision) 
{
+    auto precision = default_precision;
+    if (HasNextToken({TokenType::BEGIN_PARAMETER})) {
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::BEGIN_PARAMETER));
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::LITERAL_INT));
+        precision = TokenAsInt();
+        PAIMON_RETURN_NOT_OK(NextToken(TokenType::END_PARAMETER));
+    }
+    return precision;
+}
+}  // namespace
+
+Result<std::shared_ptr<arrow::Field>> DataTypeJsonParser::ParseType(
+    const std::string& name, const rapidjson::Value& type_json_value) {
+    if (type_json_value.IsString()) {
+        return ParseAtomicTypeField(name, type_json_value);
+    } else if (type_json_value.IsObject()) {
+        return ParseComplexTypeField(name, type_json_value);
+    }
+
+    return Status::Invalid("cannot parse data type");
+}
+
+Result<std::shared_ptr<arrow::Field>> DataTypeJsonParser::ParseAtomicTypeField(
+    const std::string& name, const rapidjson::Value& type_json_value) {
+    bool nullable = true;
+    bool is_blob = false;
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::DataType> type,
+                           ParseAtomicType(type_json_value.GetString(), 
&nullable, &is_blob));
+    if (is_blob) {
+        return BlobUtils::ToArrowField(name, nullable);
+    } else {
+        return arrow::field(name, type, nullable);
+    }
+}
+
+Result<std::shared_ptr<arrow::Field>> 
DataTypeJsonParser::ParseComplexTypeField(
+    const std::string& name, const rapidjson::Value& type_json_value) {
+    if (!type_json_value.HasMember("type")) {
+        return Status::Invalid("complex data type must have type");
+    }
+
+    std::string type_str = type_json_value["type"].GetString();
+    bool nullable = true;
+    if (type_str.find("NOT NULL") != std::string::npos) {
+        nullable = false;
+    }
+
+    if (StringUtils::StartsWith(type_str, "ARRAY")) {
+        return ParseArrayType(name, type_json_value, nullable);
+    } else if (StringUtils::StartsWith(type_str, "MAP")) {
+        return ParseMapType(name, type_json_value, nullable);
+    } else if (StringUtils::StartsWith(type_str, "ROW")) {
+        return ParseRowType(name, type_json_value, nullable);
+    } else if (StringUtils::StartsWith(type_str, "MULTISET")) {
+        return Status::NotImplemented("MULTISET is not supported");
+    }
+
+    return Status::Invalid("unknown complex data type: " + type_str);
+}
+
+Result<std::shared_ptr<arrow::Field>> DataTypeJsonParser::ParseArrayType(
+    const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable) {
+    if (!type_json_value.HasMember("element")) {
+        return Status::Invalid("array data type must have element");
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> element_field,
+                           ParseType("item", type_json_value["element"]));
+    return arrow::field(name, arrow::list(element_field), nullable);
+}
+
+Result<std::shared_ptr<arrow::Field>> DataTypeJsonParser::ParseMapType(
+    const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable) {
+    if (!type_json_value.HasMember("key") || 
!type_json_value.HasMember("value")) {
+        return Status::Invalid("map data type must have key and value");
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> key,
+                           ParseType("key", type_json_value["key"]));
+    // NOTE: Unlike Java Paimon, this C++ implementation does not support 
nullable keys in
+    // MapType. This is a limitation of Apache Arrow, which does not allow 
null keys in its
+    // MapType. As a result, we validate `nullable = false` for the map key.
+    if (key->nullable()) {
+        return Status::Invalid(fmt::format(
+            "Map field '{}' has a nullable key."
+            "Map keys must be explicitly marked as NOT NULL in the schema for 
paimon-cpp "
+            "because Apache Arrow does not support nullable map keys. "
+            "Please add 'NOT NULL' to the key type definition.",
+            name));
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> value,
+                           ParseType("value", type_json_value["value"]));
+    return arrow::field(name, std::make_shared<arrow::MapType>(key, value), 
nullable);
+}
+
+Result<std::shared_ptr<arrow::Field>> DataTypeJsonParser::ParseRowType(
+    const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable) {
+    auto data_fields =
+        
RapidJsonUtil::DeserializeKeyValue<std::vector<DataField>>(type_json_value, 
"fields");
+
+    auto struct_type = 
DataField::ConvertDataFieldsToArrowStructType(data_fields);
+    return arrow::field(name, struct_type, nullable);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_type_json_parser.h 
b/src/paimon/common/types/data_type_json_parser.h
new file mode 100644
index 0000000..92cb5d5
--- /dev/null
+++ b/src/paimon/common/types/data_type_json_parser.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+
+namespace paimon {
+class DataTypeJsonParser {
+ public:
+    DataTypeJsonParser() = delete;
+    ~DataTypeJsonParser() = delete;
+
+    /// Parses a data type from a JSON value and returns an Arrow field 
representation.
+    ///
+    /// @param name The name of the field.
+    /// @param type_json_value The JSON value representing the type.
+    /// @return A Result containing the parsed Arrow field, or an error status 
if parsing fails.
+    static Result<std::shared_ptr<arrow::Field>> ParseType(const std::string& 
name,
+                                                           const 
rapidjson::Value& type_json_value);
+
+ private:
+    static Result<std::shared_ptr<arrow::Field>> ParseAtomicTypeField(
+        const std::string& name, const rapidjson::Value& type_json_value);
+    static Result<std::shared_ptr<arrow::Field>> ParseComplexTypeField(
+        const std::string& name, const rapidjson::Value& type_json_value);
+
+    static Result<std::shared_ptr<arrow::Field>> ParseArrayType(
+        const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable);
+    static Result<std::shared_ptr<arrow::Field>> ParseMapType(
+        const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable);
+    static Result<std::shared_ptr<arrow::Field>> ParseRowType(
+        const std::string& name, const rapidjson::Value& type_json_value, bool 
nullable);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/data_type_json_parser_test.cpp 
b/src/paimon/common/types/data_type_json_parser_test.cpp
new file mode 100644
index 0000000..25b2ef5
--- /dev/null
+++ b/src/paimon/common/types/data_type_json_parser_test.cpp
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_type_json_parser.h"
+
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon::test {
+
+TEST(DataTypeJsonParserTest, ParseTypeArrayTypeSuccess) {
+    const std::string name = "array_field";
+    const char* json = R"({
+        "type": "ARRAY",
+        "element": "INT"
+    })";
+    rapidjson::Document doc;
+    doc.Parse(json);
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> field,
+                         DataTypeJsonParser::ParseType(name, doc));
+    ASSERT_NE(field, nullptr);
+}
+
+TEST(DataTypeJsonParserTest, ParseTypeMapTypeSuccess) {
+    const std::string name = "map_field";
+    const char* json = R"({
+        "type": "MAP",
+        "key": "STRING NOT NULL",
+        "value": "INT"
+    })";
+    rapidjson::Document doc;
+    doc.Parse(json);
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> field,
+                         DataTypeJsonParser::ParseType(name, doc));
+    ASSERT_NE(field, nullptr);
+}
+
+TEST(DataTypeJsonParserTest, ParseTypeRowTypeSuccess) {
+    const std::string name = "row_field";
+    const char* json = R"({
+      "type" : "ROW",
+      "fields" : [ {
+        "id" : 1,
+        "name" : "sub1",
+        "type" : "DATE"
+      }, {
+        "id" : 4,
+        "name" : "sub4",
+        "type" : "BYTES"
+      }]})";
+    rapidjson::Document doc;
+    doc.Parse(json);
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> field,
+                         DataTypeJsonParser::ParseType(name, doc));
+    ASSERT_NE(field, nullptr);
+}
+
+TEST(DataTypeJsonParserTest, ParseTypeAtomicTypeSuccess) {
+    // List of atomic types and their expected Arrow types
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    std::vector<std::pair<std::string, std::shared_ptr<arrow::DataType>>> 
test_cases = {
+        {"BOOLEAN", arrow::boolean()},
+        {"TINYINT", arrow::int8()},
+        {"SMALLINT", arrow::int16()},
+        {"INT", arrow::int32()},
+        {"INTEGER", arrow::int32()},
+        {"BIGINT", arrow::int64()},
+        {"FLOAT", arrow::float32()},
+        {"DOUBLE", arrow::float64()},
+        {"DOUBLE PRECISION", arrow::float64()},
+        {"DEC", arrow::decimal128(10, 0)},
+        {"DEC(10)", arrow::decimal128(10, 0)},
+        {"DEC(10, 3)", arrow::decimal128(10, 3)},
+        {"DECIMAL", arrow::decimal128(10, 0)},
+        {"DECIMAL(10)", arrow::decimal128(10, 0)},
+        {"DECIMAL(10, 3)", arrow::decimal128(10, 3)},
+        {"NUMERIC", arrow::decimal128(10, 0)},
+        {"NUMERIC(10)", arrow::decimal128(10, 0)},
+        {"NUMERIC(10, 3)", arrow::decimal128(10, 3)},
+        {"TIMESTAMP(0)", arrow::timestamp(arrow::TimeUnit::SECOND)},
+        {"TIMESTAMP(3)", arrow::timestamp(arrow::TimeUnit::MILLI)},
+        {"TIMESTAMP(6)", arrow::timestamp(arrow::TimeUnit::MICRO)},
+        {"TIMESTAMP(9)", arrow::timestamp(arrow::TimeUnit::NANO)},
+        {"TIMESTAMP(9) WITHOUT TIME ZONE", 
arrow::timestamp(arrow::TimeUnit::NANO)},
+        {"TIMESTAMP(9) WITH", arrow::timestamp(arrow::TimeUnit::NANO)},
+        {"TIMESTAMP(9) WITH LOCAL TIME ZONE", 
arrow::timestamp(arrow::TimeUnit::NANO, timezone)},
+        {"TIMESTAMP_LTZ(9)", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)},
+        {"BYTES", arrow::binary()},
+        {"STRING", arrow::utf8()},
+    };
+
+    for (const auto& test_case : test_cases) {
+        const std::string& type_str = test_case.first;
+
+        rapidjson::Document doc;
+        rapidjson::Value value(type_str.data(), doc.GetAllocator());
+
+        // Parse type and verify the result
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> field,
+                             DataTypeJsonParser::ParseType("field_name", 
value));
+        ASSERT_TRUE(field->type()->Equals(test_case.second));
+    }
+
+    // Invalid case
+    {
+        rapidjson::Document invalid_doc;
+        rapidjson::Value value("VARCHAR(test)", invalid_doc.GetAllocator());
+        ASSERT_NOK(DataTypeJsonParser::ParseType("field_name", value));
+    }
+    {
+        rapidjson::Document invalid_doc;
+        rapidjson::Value value("TIMESTAMP(4)", invalid_doc.GetAllocator());
+        ASSERT_NOK_WITH_MSG(DataTypeJsonParser::ParseType("field_name", value),
+                            "only support precision 0/3/6/9 in timestamp 
type");
+    }
+    {
+        rapidjson::Document invalid_doc;
+        rapidjson::Value value("TIMESTAMP(8) WITH LOCAL TIME ZONE", 
invalid_doc.GetAllocator());
+        ASSERT_NOK_WITH_MSG(DataTypeJsonParser::ParseType("field_name", value),
+                            "only support precision 0/3/6/9 in timestamp 
type");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/types/data_type_test.cpp 
b/src/paimon/common/types/data_type_test.cpp
new file mode 100644
index 0000000..137cf85
--- /dev/null
+++ b/src/paimon/common/types/data_type_test.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/data_type.h"
+
+#include <stdexcept>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon::test {
+
+TEST(DataTypeTest, Create) {
+    auto int32_type = arrow::int32();
+    auto nullable_int32 = DataType::Create(int32_type, /*nullable=*/true, 
/*metadata=*/nullptr);
+    ASSERT_TRUE(nullable_int32);
+
+    auto non_nullable_int32 =
+        DataType::Create(int32_type, /*nullable=*/false, /*metadata=*/nullptr);
+    ASSERT_TRUE(non_nullable_int32);
+}
+
+TEST(DataTypeTest, WithNullable) {
+    auto int32_type = arrow::int32();
+    DataType nullable_int32(int32_type, true, /*metadata=*/nullptr);
+    ASSERT_EQ(nullable_int32.WithNullable("INT"), "INT");
+
+    DataType non_nullable_int32(int32_type, false, /*metadata=*/nullptr);
+    ASSERT_EQ(non_nullable_int32.WithNullable("INT"), "INT NOT NULL");
+}
+
+TEST(DataTypeTest, DataTypeToString) {
+    DataType dummy_data_type(arrow::null(), true,
+                             /*metadata=*/nullptr);  // Need a DataType 
instance to call the method
+
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::boolean()), "BOOLEAN");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::int8()), "TINYINT");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::int16()), "SMALLINT");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::int32()), "INT");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::int64()), "BIGINT");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::float32()), "FLOAT");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::float64()), "DOUBLE");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::utf8()), "STRING");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::binary()), "BYTES");
+    {
+        std::shared_ptr<arrow::Field> blob_field = 
BlobUtils::ToArrowField("f2_blob", false);
+        DataType blob_type(blob_field->type(), blob_field->nullable(), 
blob_field->metadata());
+        ASSERT_EQ(blob_type.DataTypeToString(blob_field->type()), "BLOB");
+    }
+    ASSERT_EQ(dummy_data_type.DataTypeToString(arrow::date32()), "DATE");
+
+    auto decimal_type1 = arrow::decimal128(10, 2);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(decimal_type1), "DECIMAL(10, 
2)");
+
+    auto decimal_type2 = arrow::decimal128(18, 6);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(decimal_type2), "DECIMAL(18, 
6)");
+
+    auto nano_timestamp_type = arrow::timestamp(arrow::TimeUnit::NANO);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(nano_timestamp_type), 
"TIMESTAMP(9)");
+
+    auto micro_timestamp_type = arrow::timestamp(arrow::TimeUnit::MICRO);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(micro_timestamp_type), 
"TIMESTAMP(6)");
+
+    auto milli_timestamp_type = arrow::timestamp(arrow::TimeUnit::MILLI);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(milli_timestamp_type), 
"TIMESTAMP(3)");
+
+    auto second_timestamp_type = arrow::timestamp(arrow::TimeUnit::SECOND);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(second_timestamp_type), 
"TIMESTAMP(0)");
+
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    auto tz_timestamp_type = arrow::timestamp(arrow::TimeUnit::NANO, timezone);
+    ASSERT_EQ(dummy_data_type.DataTypeToString(tz_timestamp_type),
+              "TIMESTAMP(9) WITH LOCAL TIME ZONE");
+
+    tz_timestamp_type = arrow::timestamp(arrow::TimeUnit::NANO, 
"Asia/Shanghai");
+    ASSERT_EQ(dummy_data_type.DataTypeToString(tz_timestamp_type),
+              "TIMESTAMP(9) WITH LOCAL TIME ZONE");
+
+    auto unknown_type = arrow::date64();
+    ASSERT_THROW(dummy_data_type.DataTypeToString(unknown_type), 
std::invalid_argument);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/types/map_type.h 
b/src/paimon/common/types/map_type.h
new file mode 100644
index 0000000..7749dfc
--- /dev/null
+++ b/src/paimon/common/types/map_type.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_type.h"
+#include "paimon/common/utils/rapidjson_util.h"
+
+namespace paimon {
+
+// NOTE:
+// In contrast to Java Paimon, this C++ implementation does NOT allow nullable 
keys in MapType.
+// This restriction is due to the limitations of Apache Arrow, which does not 
support nullable map
+// keys.
+//
+// When using this class, ensure that the key type is always non-nullable.
+// Attempting to use a nullable key will result in incorrect behavior or 
runtime errors.
+class MapType : public DataType {
+ public:
+    static constexpr char TYPE[] = "MAP";
+
+    MapType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+            const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
+        : DataType(type, nullable, metadata) {}
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override {
+        rapidjson::Value obj(rapidjson::kObjectType);
+        obj.AddMember(
+            rapidjson::StringRef("type"),
+            RapidJsonUtil::SerializeValue(WithNullable(std::string(TYPE)), 
allocator).Move(),
+            *allocator);
+        auto type = 
arrow::internal::checked_cast<arrow::MapType*>(type_.get());
+        auto key_field = type->key_field();
+        std::shared_ptr<DataType> key_data_type =
+            DataType::Create(key_field->type(), key_field->nullable(), 
/*metadata=*/nullptr);
+        obj.AddMember(rapidjson::StringRef("key"),
+                      RapidJsonUtil::SerializeValue(*key_data_type, 
allocator).Move(), *allocator);
+        auto value_field = type->item_field();
+        std::shared_ptr<DataType> value_data_type =
+            DataType::Create(value_field->type(), value_field->nullable(), 
/*metadata=*/nullptr);
+        obj.AddMember(rapidjson::StringRef("value"),
+                      RapidJsonUtil::SerializeValue(*value_data_type, 
allocator).Move(),
+                      *allocator);
+        return obj;
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/row_kind.cpp 
b/src/paimon/common/types/row_kind.cpp
new file mode 100644
index 0000000..380d219
--- /dev/null
+++ b/src/paimon/common/types/row_kind.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/row_kind.h"
+
+#include <cstdint>
+
+namespace paimon {
+
+const RowKind* RowKind::Insert() {
+    static const RowKind kInsert{"+I", "INSERT", static_cast<std::int8_t>(0)};
+    return &kInsert;
+}
+
+const RowKind* RowKind::UpdateBefore() {
+    static const RowKind kUpdateBefore{"-U", "UPDATE_BEFORE", 
static_cast<std::int8_t>(1)};
+    return &kUpdateBefore;
+}
+
+const RowKind* RowKind::UpdateAfter() {
+    static const RowKind kUpdateAfter{"+U", "UPDATE_AFTER", 
static_cast<std::int8_t>(2)};
+    return &kUpdateAfter;
+}
+
+const RowKind* RowKind::Delete() {
+    static const RowKind kDelete{"-D", "DELETE", static_cast<std::int8_t>(3)};
+    return &kDelete;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/row_kind.h 
b/src/paimon/common/types/row_kind.h
new file mode 100644
index 0000000..48616e2
--- /dev/null
+++ b/src/paimon/common/types/row_kind.h
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+/// Lists all kinds of changes that a row can describe in a changelog.
+class RowKind {
+ public:
+    /// Returns a short string representation of this `RowKind`.
+    ///
+    /// <ul>
+    /// <li>"+I" represents `Insert()`.
+    /// <li>"-U" represents `UpdateBefore()`.
+    /// <li>"+U" represents `UpdateAfter()`.
+    /// <li>"-D" represents `Delete()`.
+    /// </ul>
+    const std::string& ShortString() const {
+        return short_string_;
+    }
+
+    const std::string& Name() const {
+        return name_;
+    }
+
+    /// Returns the byte value representation of this `RowKind`. The byte 
value is
+    /// used for serialization and deserialization.
+    ///
+    /// <ul>
+    /// <li>"0" represents `Insert()`.
+    /// <li>"1" represents `UpdateBefore()`.
+    /// <li>"2" represents `UpdateAfter()`.
+    /// <li>"3" represents `Delete()`.
+    /// </ul>
+    int8_t ToByteValue() const {
+        return value_;
+    }
+
+    bool operator==(const RowKind& other) const {
+        if (this == &other) {
+            return true;
+        }
+        return value_ == other.value_ && short_string_ == other.short_string_ 
&&
+               name_ == other.name_;
+    }
+
+    /// Insertion operation.
+    static const RowKind* Insert();
+
+    /// Update operation with the previous content of the updated row.
+    ///
+    /// This kind SHOULD occur together with `UpdateAfter()` for modelling an 
update
+    /// that needs to retract the previous row first. It is useful in cases of 
a non-idempotent
+    /// update, i.e., an update of a row that is not uniquely identifiable by 
a key.
+    static const RowKind* UpdateBefore();
+
+    /// Update operation with new content of the updated row.
+    ///
+    /// This kind CAN occur together with `UpdateBefore()` for modelling an 
update
+    /// that needs to retract the previous row first. OR it describes an 
idempotent update,
+    /// i.e., an update of a row that is uniquely identifiable by a key.
+    static const RowKind* UpdateAfter();
+
+    /// Deletion operation.
+    static const RowKind* Delete();
+
+    /// Is `UpdateBefore()` or `Delete()`.
+    bool IsRetract() const {
+        return this == UpdateBefore() || this == Delete();
+    }
+
+    /// Is `Insert()` or `UpdateAfter()`.
+    bool IsAdd() const {
+        return this == Insert() || this == UpdateAfter();
+    }
+
+    /// Creates a `RowKind` from the given byte value. Each `RowKind` has a
+    /// byte value representation.
+    ///
+    /// @see #ToByteValue() for mapping of byte value and `RowKind`.
+    static Result<const RowKind*> FromByteValue(int8_t value) {
+        switch (value) {
+            case 0:
+                return Insert();
+            case 1:
+                return UpdateBefore();
+            case 2:
+                return UpdateAfter();
+            case 3:
+                return Delete();
+            default:
+                return Status::Invalid(fmt::format("Unsupported byte value {} 
for row kind.",
+                                                   
static_cast<int32_t>(value)));
+        }
+    }
+
+    /// Creates a `RowKind` from the given short string.
+    ///
+    /// @see #shortString() for mapping of string and `RowKind`.
+    static Result<const RowKind*> FromShortString(const std::string& value) {
+        if (value == "+I") {
+            return Insert();
+        } else if (value == "-U") {
+            return UpdateBefore();
+        } else if (value == "+U") {
+            return UpdateAfter();
+        } else if (value == "-D") {
+            return Delete();
+        } else {
+            return Status::Invalid(fmt::format("Unsupported short string {} 
for row kind.", value));
+        }
+    }
+
+ private:
+    /// Creates a `RowKind` with the given short string and byte value 
representation
+    /// of the `RowKind`.
+    RowKind(const std::string& short_string, const std::string& name, int8_t 
value)
+        : short_string_(short_string), name_(name), value_(value) {}
+
+ private:
+    std::string short_string_;
+    std::string name_;
+    int8_t value_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/types/row_kind_test.cpp 
b/src/paimon/common/types/row_kind_test.cpp
new file mode 100644
index 0000000..b0ab0bb
--- /dev/null
+++ b/src/paimon/common/types/row_kind_test.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/row_kind.h"
+
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(RowKindTest, TestSimple) {
+    ASSERT_OK_AND_ASSIGN(const RowKind* insert, RowKind::FromByteValue(0));
+    ASSERT_OK_AND_ASSIGN(const RowKind* update_before, 
RowKind::FromByteValue(1));
+    ASSERT_OK_AND_ASSIGN(const RowKind* update_after, 
RowKind::FromByteValue(2));
+    ASSERT_OK_AND_ASSIGN(const RowKind* delete_kind, 
RowKind::FromByteValue(3));
+    ASSERT_NOK(RowKind::FromByteValue(4));
+    ASSERT_EQ(*insert, *insert);
+    ASSERT_EQ(insert->ShortString(), "+I");
+    ASSERT_EQ(update_before->ShortString(), "-U");
+    ASSERT_EQ(update_after->ShortString(), "+U");
+    ASSERT_EQ(delete_kind->ShortString(), "-D");
+    ASSERT_EQ(insert->Name(), "INSERT");
+    ASSERT_EQ(update_before->Name(), "UPDATE_BEFORE");
+    ASSERT_EQ(update_after->Name(), "UPDATE_AFTER");
+    ASSERT_EQ(delete_kind->Name(), "DELETE");
+    ASSERT_FALSE(*insert == *delete_kind);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/types/row_type.cpp 
b/src/paimon/common/types/row_type.cpp
new file mode 100644
index 0000000..f497cdf
--- /dev/null
+++ b/src/paimon/common/types/row_type.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/types/row_type.h"
+
+#include <cassert>
+#include <cstdint>
+#include <optional>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+RowType::RowType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+                 const std::shared_ptr<const arrow::KeyValueMetadata>& 
metadata)
+    : DataType(type, nullable, metadata) {}
+
+rapidjson::Value RowType::ToJson(rapidjson::Document::AllocatorType* 
allocator) const
+    noexcept(false) {
+    rapidjson::Value obj(rapidjson::kObjectType);
+    obj.AddMember(rapidjson::StringRef("type"),
+                  RapidJsonUtil::SerializeValue(WithNullable(TYPE), 
allocator).Move(), *allocator);
+    auto type = arrow::internal::checked_cast<arrow::StructType*>(type_.get());
+    if (type == nullptr) {
+        throw std::invalid_argument("type failed to cast to StructType");
+    }
+
+    std::vector<DataField> fields;
+    for (const auto& field : type->fields()) {
+        std::optional<std::string> description;
+        int32_t field_id = -1;
+        if (field->HasMetadata() && field->metadata()) {
+            if (field->metadata()->Contains(DataField::FIELD_ID)) {
+                auto field_id_result = 
field->metadata()->Get(DataField::FIELD_ID);
+                if (!field_id_result.ok()) {
+                    throw std::invalid_argument("get FIELD_ID from meta data 
failed");
+                } else {
+                    std::optional<int32_t> id =
+                        
StringUtils::StringToValue<int32_t>(field_id_result.ValueUnsafe());
+                    if (id != std::nullopt) {
+                        field_id = id.value();
+                    } else {
+                        assert(false);
+                    }
+                }
+            }
+            auto description_result = 
field->metadata()->Get(DataField::DESCRIPTION);
+            if (description_result.ok()) {
+                description = description_result.ValueUnsafe();
+            }
+        }
+        fields.emplace_back(field_id, field, description);
+    }
+
+    obj.AddMember(rapidjson::StringRef("fields"),
+                  RapidJsonUtil::SerializeValue(fields, allocator).Move(), 
*allocator);
+    return obj;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/types/row_type.h 
b/src/paimon/common/types/row_type.h
new file mode 100644
index 0000000..f75b103
--- /dev/null
+++ b/src/paimon/common/types/row_type.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_type.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace arrow {
+class DataType;
+class KeyValueMetadata;
+}  // namespace arrow
+
+namespace paimon {
+
+class RowType : public DataType {
+ public:
+    static constexpr char TYPE[] = "ROW";
+
+    RowType(const std::shared_ptr<arrow::DataType>& type, bool nullable,
+            const std::shared_ptr<const arrow::KeyValueMetadata>& metadata);
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override;
+};
+
+}  // namespace paimon


Reply via email to