This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ba36d49  feat(schema): add table schema and schema manager (#54)
ba36d49 is described below

commit ba36d497bf41b4b1dd0408dd74bd772cf3f96a33
Author: Yonghao Fang <[email protected]>
AuthorDate: Mon Jun 8 15:02:30 2026 +0800

    feat(schema): add table schema and schema manager (#54)
---
 include/paimon/schema/schema.h                 |  110 ++
 src/paimon/core/schema/schema_manager.cpp      |  124 +++
 src/paimon/core/schema/schema_manager.h        |   75 ++
 src/paimon/core/schema/schema_manager_test.cpp |  203 ++++
 src/paimon/core/schema/table_schema.cpp        |  364 +++++++
 src/paimon/core/schema/table_schema.h          |  155 +++
 src/paimon/core/schema/table_schema_test.cpp   | 1301 ++++++++++++++++++++++++
 7 files changed, 2332 insertions(+)

diff --git a/include/paimon/schema/schema.h b/include/paimon/schema/schema.h
new file mode 100644
index 0000000..e3f557f
--- /dev/null
+++ b/include/paimon/schema/schema.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// This interface provides access to TableSchema-related information.
+class PAIMON_EXPORT Schema {
+ public:
+    virtual ~Schema() = default;
+
+    /// Get the Arrow C schema representation of this table schema.
+    /// @return A result containing an ArrowSchema, or an error status if 
conversion fails.
+    virtual Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const = 0;
+
+    /// Get the JSON schema representation of this table schema.
+    ///
+    /// This method provides a JSON string that represents the complete schema 
information.
+    ///
+    /// @return A string containing the JSON schema, or an error status on 
failure.
+    virtual Result<std::string> GetJsonSchema() const = 0;
+
+    /// Get the names of all fields in the table schema.
+    /// @return A vector of field names.
+    virtual std::vector<std::string> FieldNames() const = 0;
+
+    /// Get the type of specific field.
+    ///
+    /// @param field_name The name of the field to query.
+    /// @return Result containing the FieldType of the specified field, or an 
error status on
+    /// failure.
+    virtual Result<FieldType> GetFieldType(const std::string& field_name) 
const = 0;
+
+    /// Get an optional comment describing the schema object.
+    /// @return The comment if set, or std::nullopt otherwise.
+    virtual std::optional<std::string> Comment() const = 0;
+};
+
+/// Schema contract for data tables.
+class PAIMON_EXPORT DataSchema : public Schema {
+ public:
+    ~DataSchema() override = default;
+
+    /// Get the unique identifier of this table schema.
+    /// @return The schema id
+    virtual int64_t Id() const = 0;
+
+    /// Get the list of primary key field names.
+    /// @return A reference to the vector of primary key names; empty if no 
primary keys are
+    /// defined.
+    virtual const std::vector<std::string>& PrimaryKeys() const = 0;
+
+    /// Get the list of partition key field names.
+    /// @return A reference to the vector of partition key names; empty if the 
table is not
+    /// partitioned.
+    virtual const std::vector<std::string>& PartitionKeys() const = 0;
+
+    /// Get the list of bucket key field names used for bucketing.
+    /// @return A reference to the vector of bucket key names.
+    virtual const std::vector<std::string>& BucketKeys() const = 0;
+
+    /// Get the number of buckets configured for this table.
+    /// @return The number of buckets.
+    virtual int32_t NumBuckets() const = 0;
+
+    /// Get the highest field id assigned in this schema.
+    /// @return The maximum field id.
+    virtual int32_t HighestFieldId() const = 0;
+
+    /// Get the table-level options associated with this schema.
+    /// @return A reference to the map of option key-value pairs (e.g., file 
format, filesystem).
+    virtual const std::map<std::string, std::string>& Options() const = 0;
+};
+
+/// Schema contract for system tables.
+class PAIMON_EXPORT SystemSchema : public Schema {
+ public:
+    ~SystemSchema() override = default;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager.cpp 
b/src/paimon/core/schema/schema_manager.cpp
new file mode 100644
index 0000000..2425cc4
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_manager.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/schema/schema_validation.h"
+#include "paimon/core/utils/branch_manager.h"
+#include "paimon/core/utils/file_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+SchemaManager::SchemaManager(const std::shared_ptr<FileSystem>& file_system,
+                             const std::string& table_root)
+    : SchemaManager(file_system, table_root, 
BranchManager::DEFAULT_MAIN_BRANCH) {}
+
+SchemaManager::SchemaManager(const std::shared_ptr<FileSystem>& file_system,
+                             const std::string& table_root, const std::string& 
branch)
+    : file_system_(file_system),
+      table_root_(table_root),
+      branch_(BranchManager::NormalizeBranch(branch)) {}
+
+std::string SchemaManager::BranchPath() const {
+    return BranchManager::BranchPath(table_root_, branch_);
+}
+
+std::string SchemaManager::ToSchemaPath(int64_t schema_id) const {
+    return PathUtil::JoinPath(BranchPath(),
+                              "/schema/" + std::string(SCHEMA_PREFIX) + 
std::to_string(schema_id));
+}
+Result<std::optional<std::shared_ptr<TableSchema>>> SchemaManager::Latest() 
const {
+    std::vector<int64_t> versions;
+    PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(file_system_, 
SchemaDirectory(),
+                                                       
std::string(SCHEMA_PREFIX), &versions));
+    if (versions.empty()) {
+        return std::optional<std::shared_ptr<TableSchema>>();
+    }
+    int64_t max_schema_id = versions[0];
+    for (const auto& version : versions) {
+        max_schema_id = std::max(max_schema_id, version);
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<TableSchema> schema, 
ReadSchema(max_schema_id));
+    return std::optional<std::shared_ptr<TableSchema>>(schema);
+}
+
+Result<std::shared_ptr<TableSchema>> SchemaManager::ReadSchema(int64_t 
schema_id) const {
+    auto iter = schema_cache_.find(schema_id);
+    if (iter != schema_cache_.end()) {
+        return iter->second;
+    }
+    auto path = ToSchemaPath(schema_id);
+    std::string content;
+    PAIMON_RETURN_NOT_OK(file_system_->ReadFile(path, &content));
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<TableSchema> schema,
+                           TableSchema::CreateFromJson(content));
+    schema_cache_[schema_id] = schema;
+    return schema;
+}
+
+std::string SchemaManager::SchemaDirectory() const {
+    return PathUtil::JoinPath(BranchPath(), "/schema");
+}
+
+Result<bool> SchemaManager::SchemaExists(int64_t id) const {
+    std::string schema_path = ToSchemaPath(id);
+    return file_system_->Exists(schema_path);
+}
+
+Result<std::vector<int64_t>> SchemaManager::ListAllIds() const {
+    std::vector<int64_t> versions;
+    PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(file_system_, 
SchemaDirectory(),
+                                                       
std::string(SCHEMA_PREFIX), &versions));
+    return versions;
+}
+
+Result<std::unique_ptr<TableSchema>> SchemaManager::CreateTable(
+    const std::shared_ptr<arrow::Schema>& schema, const 
std::vector<std::string>& partition_keys,
+    const std::vector<std::string>& primary_keys,
+    const std::map<std::string, std::string>& options) {
+    while (true) {
+        PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> 
latest_schema, Latest());
+        if (latest_schema) {
+            return Status::Invalid("Schema in filesystem exists, creation is 
not allowed.");
+        }
+        PAIMON_ASSIGN_OR_RAISE(
+            std::unique_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
PAIMON_RETURN_NOT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+        std::string schema_path = ToSchemaPath(0);
+        PAIMON_ASSIGN_OR_RAISE(std::string content, 
table_schema->ToJsonString());
+        auto status = file_system_->AtomicStore(schema_path, content);
+        if (status.ok()) {
+            return table_schema;
+        }
+    }
+    return Status::Invalid("create table failed, should not be here");
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager.h 
b/src/paimon/core/schema/schema_manager.h
new file mode 100644
index 0000000..382fa30
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class FileSystem;
+
+/// Schema Manager to manage schema versions.
+class SchemaManager {
+ public:
+    SchemaManager(const std::shared_ptr<FileSystem>& file_system, const 
std::string& table_root);
+    /// Specify the default branch for data writing.
+    SchemaManager(const std::shared_ptr<FileSystem>& file_system, const 
std::string& table_root,
+                  const std::string& branch);
+
+    /// Read schema for schema id. Find schema in cache first.
+    Result<std::shared_ptr<TableSchema>> ReadSchema(int64_t schema_id) const;
+    Result<std::optional<std::shared_ptr<TableSchema>>> Latest() const;
+    Result<std::unique_ptr<TableSchema>> CreateTable(
+        const std::shared_ptr<arrow::Schema>& schema,
+        const std::vector<std::string>& partition_keys,
+        const std::vector<std::string>& primary_keys,
+        const std::map<std::string, std::string>& options);
+
+    std::string SchemaDirectory() const;
+    Result<bool> SchemaExists(int64_t id) const;
+    Result<std::vector<int64_t>> ListAllIds() const;
+
+ private:
+    std::string BranchPath() const;
+    std::string ToSchemaPath(int64_t schema_id) const;
+
+ private:
+    static constexpr char SCHEMA_PREFIX[] = "schema-";
+
+ private:
+    std::shared_ptr<FileSystem> file_system_;
+    std::string table_root_;
+    const std::string branch_;
+    mutable std::map<int64_t, std::shared_ptr<TableSchema>> schema_cache_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager_test.cpp 
b/src/paimon/core/schema/schema_manager_test.cpp
new file mode 100644
index 0000000..83f995a
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager_test.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_manager.h"
+
+#include <set>
+#include <utility>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(SchemaManagerTest, TestSimple) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root =
+        paimon::test::GetDataDir() + 
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/";
+    SchemaManager manager(fs, table_root);
+    ASSERT_EQ(manager.ToSchemaPath(/*schema_id=*/0),
+              paimon::test::GetDataDir() +
+                  
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/schema/schema-0");
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> ret, 
manager.ReadSchema(/*schema_id=*/1));
+    std::string schema_json = R"({
+        "version" : 3,
+        "id" : 1,
+        "fields" : [ {
+                "id" : 1,
+                "name" : "key1",
+                "type" : "INT NOT NULL"
+        }, {
+                "id" : 7,
+                "name" : "k",
+                "type" : "STRING"
+        }, {
+                "id" : 2,
+                "name" : "key_2",
+                "type" : "INT NOT NULL"
+        }, {
+                "id" : 4,
+                "name" : "c",
+                "type" : "INT"
+        }, {
+                "id" : 8,
+                "name" : "d",
+                "type" : "INT",
+                "description" : ""
+        }, {
+                "id" : 6,
+                "name" : "a",
+                "type" : "INT"
+        }, {
+                "id" : 0,
+                "name" : "key0",
+                "type" : "INT NOT NULL"
+        }, {
+                "id" : 9,
+                "name" : "e",
+                "type" : "INT"
+        } ],
+        "highestFieldId" : 9,
+        "partitionKeys" : [ "key0", "key1" ],
+        "primaryKeys" : [ "key0", "key1", "key_2" ],
+        "options" : {
+                "bucket" : "1",
+                "manifest.format" : "orc",
+                "file.format" : "orc",
+                "deletion-vectors.enabled" : "true",
+                "commit.force-compact" : "true"
+        },
+        "timeMillis" : 1730516111087
+    })";
+    ASSERT_OK_AND_ASSIGN(auto expected_schema, 
TableSchema::CreateFromJson(schema_json));
+    ASSERT_EQ(*ret, *expected_schema);
+    ASSERT_FALSE(manager.schema_cache_.empty());
+    ASSERT_EQ(*manager.ReadSchema(/*schema_id=*/1).value(), *expected_schema);
+    ASSERT_EQ(*(manager.Latest().value().value()), *expected_schema);
+}
+
+TEST(SchemaManagerTest, TestNonExistTable) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root = paimon::test::GetDataDir() + 
"/non-exist.db/non-exist/";
+    SchemaManager manager(fs, table_root);
+    ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> latest, 
manager.Latest());
+    ASSERT_EQ(latest, std::nullopt);
+    auto ret = manager.ReadSchema(/*schema_id=*/100);
+    ASSERT_FALSE(ret.ok());
+}
+
+TEST(SchemaManagerTest, TestSchemaDirectory) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root = paimon::test::GetDataDir() + "/sample_table/";
+    SchemaManager manager(fs, table_root);
+    ASSERT_EQ(manager.SchemaDirectory(), paimon::test::GetDataDir() + 
"/sample_table/schema");
+}
+
+TEST(SchemaManagerTest, TestSchemaDirectoryWithBranch) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root = paimon::test::GetDataDir() + "/sample_table/";
+    {
+        SchemaManager manager(fs, table_root, /*branch=*/"data");
+        ASSERT_EQ(manager.SchemaDirectory(),
+                  paimon::test::GetDataDir() + 
"/sample_table/branch/branch-data/schema");
+    }
+    {
+        SchemaManager manager(fs, table_root, /*branch=*/"main");
+        ASSERT_EQ(manager.SchemaDirectory(), paimon::test::GetDataDir() + 
"/sample_table/schema");
+    }
+}
+
+TEST(SchemaManagerTest, TestCreateTableWithInvalidInput) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    auto dir = UniqueTestDirectory::Create();
+    SchemaManager manager(fs, dir->Str());
+
+    // Create an Arrow schema
+    auto field1 = std::make_shared<arrow::Field>("id", arrow::int32(), false);
+    auto field2 = std::make_shared<arrow::Field>("name", arrow::utf8());
+    auto field3 = std::make_shared<arrow::Field>("value", arrow::int64());
+    auto schema = arrow::schema(arrow::FieldVector{field1, field2, field3});
+
+    std::vector<std::string> partition_keys = {"id"};
+    std::vector<std::string> primary_keys = {"id"};
+    std::map<std::string, std::string> options = {{"file.format", "orc"},
+                                                  {"commit.force-compact", 
"true"}};
+
+    // Create table
+    auto result = manager.CreateTable(schema, partition_keys, primary_keys, 
options);
+    ASSERT_NOK(result);
+}
+
+TEST(SchemaManagerTest, TestCreateTable) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    auto dir = UniqueTestDirectory::Create();
+    SchemaManager manager(fs, dir->Str());
+
+    // Create an Arrow schema
+    auto field1 = std::make_shared<arrow::Field>("id", arrow::int32(), false);
+    auto field2 = std::make_shared<arrow::Field>("name", arrow::utf8());
+    auto field3 = std::make_shared<arrow::Field>("value", arrow::int64());
+    auto schema = arrow::schema(arrow::FieldVector{field1, field2, field3});
+
+    std::vector<std::string> partition_keys = {"name"};
+    std::vector<std::string> primary_keys = {"id"};
+    std::map<std::string, std::string> options = {{"file.format", "orc"},
+                                                  {"commit.force-compact", 
"true"}};
+
+    // Create table
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::unique_ptr<TableSchema> result,
+                         manager.CreateTable(schema, partition_keys, 
primary_keys, options));
+
+    // Verify schema was created
+    ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> 
latest_result,
+                         manager.Latest());
+    ASSERT_TRUE(latest_result);
+    auto created_schema = latest_result.value();
+    ASSERT_EQ(created_schema->Id(), 0);
+    ASSERT_EQ(created_schema->PartitionKeys(), partition_keys);
+    ASSERT_EQ(created_schema->PrimaryKeys(), primary_keys);
+}
+
+TEST(SchemaManagerTest, TestCreateTableAlreadyExists) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root =
+        paimon::test::GetDataDir() + 
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/";
+    SchemaManager manager(fs, table_root);
+
+    // Create an Arrow schema
+    auto field = std::make_shared<arrow::Field>("dummy", arrow::int32());
+    auto schema = arrow::schema(arrow::FieldVector{field});
+
+    // Try to create table where schema already exists
+    ASSERT_NOK_WITH_MSG(manager.CreateTable(schema, {}, {}, {}), "Schema in 
filesystem exists");
+}
+
+TEST(SchemaManagerTest, TestListAllIds) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string table_root =
+        paimon::test::GetDataDir() + 
"/orc/pk_table_with_mor.db/pk_table_with_mor/";
+    SchemaManager manager(fs, table_root);
+    ASSERT_OK_AND_ASSIGN(auto ids, manager.ListAllIds());
+    ASSERT_EQ(std::set<int64_t>(ids.begin(), ids.end()), std::set<int64_t>({0, 
1, 2, 3, 4}));
+}
+}  // namespace paimon::test
diff --git a/src/paimon/core/schema/table_schema.cpp 
b/src/paimon/core/schema/table_schema.cpp
new file mode 100644
index 0000000..a029461
--- /dev/null
+++ b/src/paimon/core/schema/table_schema.cpp
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/table_schema.h"
+
+#include <algorithm>
+#include <iterator>
+#include <set>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/defs.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<TableSchema>> TableSchema::Create(
+    int64_t schema_id, const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<std::string>& partition_keys, const 
std::vector<std::string>& primary_keys,
+    const std::map<std::string, std::string>& options) {
+    if (schema_id != 0) {
+        return Status::NotImplemented("do not support schema evolution, 
schema_id must be 0");
+    }
+    std::vector<DataField> data_fields;
+    int32_t field_id = 0;
+    std::set<std::string> primary_key_set;
+    for (const auto& primary_key : primary_keys) {
+        primary_key_set.insert(primary_key);
+    }
+    for (const auto& field : schema->fields()) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> field_with_id,
+                               AssignFieldIdsRecursively(field, 
/*set_field_id=*/true, &field_id));
+        if (primary_key_set.count(field_with_id->name())) {
+            field_with_id = field_with_id->WithNullable(false);
+        }
+        PAIMON_ASSIGN_OR_RAISE(DataField data_field,
+                               
DataField::ConvertArrowFieldToDataField(field_with_id));
+        data_fields.push_back(data_field);
+    }
+    return InitSchema(schema_id, data_fields, field_id - 1, partition_keys, 
primary_keys, options,
+                      /*comment=*/std::nullopt, 
DateTimeUtils::GetCurrentUTCTimeUs() / 1000);
+}
+
+Result<std::shared_ptr<arrow::KeyValueMetadata>> 
TableSchema::MakeMetaDataWithFieldId(
+    const std::shared_ptr<arrow::Field>& field, int32_t field_id) {
+    std::vector<std::string> keys = {std::string(DataField::FIELD_ID)};
+    std::vector<std::string> values = {std::to_string(field_id)};
+    std::shared_ptr<arrow::KeyValueMetadata> metadata = 
arrow::KeyValueMetadata::Make(keys, values);
+    if (field->HasMetadata() && field->metadata()) {
+        if (field->metadata()->Contains(DataField::FIELD_ID)) {
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::string field_id_result,
+                                              
field->metadata()->Get(DataField::FIELD_ID));
+            if (std::to_string(field_id) != field_id_result) {
+                return Status::Invalid(fmt::format("field id {} not match with 
{} {} in metadata",
+                                                   field_id, 
DataField::FIELD_ID, field_id_result));
+            }
+        }
+        metadata = metadata->Merge(*field->metadata());
+    }
+    return metadata;
+}
+
+Result<std::shared_ptr<arrow::Field>> TableSchema::AssignFieldIdsRecursively(
+    const std::shared_ptr<arrow::Field>& field, bool assign_id_to_self, 
int32_t* field_id) {
+    std::shared_ptr<arrow::KeyValueMetadata> metadata;
+    if (assign_id_to_self) {
+        PAIMON_ASSIGN_OR_RAISE(metadata, MakeMetaDataWithFieldId(field, 
*field_id));
+        (*field_id)++;
+    }
+    auto type = field->type();
+    if (type->id() == arrow::Type::STRUCT) {
+        auto struct_type = 
arrow::internal::checked_pointer_cast<arrow::StructType>(field->type());
+        arrow::FieldVector new_childs;
+        for (const auto& child : struct_type->fields()) {
+            PAIMON_ASSIGN_OR_RAISE(
+                auto new_child, AssignFieldIdsRecursively(child, 
/*set_field_id=*/true, field_id));
+            new_childs.push_back(new_child);
+        }
+        return arrow::field(field->name(), arrow::struct_(new_childs), 
field->nullable(), metadata);
+    } else if (type->id() == arrow::Type::LIST) {
+        auto list_type = 
arrow::internal::checked_pointer_cast<arrow::ListType>(field->type());
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> new_value_field,
+                               
AssignFieldIdsRecursively(list_type->value_field(),
+                                                         
/*set_field_id=*/false, field_id));
+        return arrow::field(field->name(), arrow::list(new_value_field), 
field->nullable(),
+                            metadata);
+    } else if (field->type()->id() == arrow::Type::MAP) {
+        auto map_type = 
arrow::internal::checked_pointer_cast<arrow::MapType>(field->type());
+        std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+        std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+        PAIMON_ASSIGN_OR_RAISE(
+            key_field, AssignFieldIdsRecursively(key_field, 
/*set_field_id=*/false, field_id));
+        PAIMON_ASSIGN_OR_RAISE(
+            value_field, AssignFieldIdsRecursively(value_field, 
/*set_field_id=*/false, field_id));
+        return arrow::field(field->name(), arrow::map(key_field->type(), 
value_field),
+                            field->nullable(), metadata);
+    }
+    return metadata ? field->WithMergedMetadata(metadata) : field;
+}
+
+rapidjson::Value TableSchema::ToJson(rapidjson::Document::AllocatorType* 
allocator) const
+    noexcept(false) {
+    rapidjson::Value obj(rapidjson::kObjectType);
+    obj.AddMember(rapidjson::StringRef("version"),
+                  RapidJsonUtil::SerializeValue(version_, allocator).Move(), 
*allocator);
+    obj.AddMember(rapidjson::StringRef("id"), 
RapidJsonUtil::SerializeValue(id_, allocator).Move(),
+                  *allocator);
+    obj.AddMember(rapidjson::StringRef("fields"),
+                  RapidJsonUtil::SerializeValue(fields_, allocator).Move(), 
*allocator);
+    obj.AddMember(rapidjson::StringRef("highestFieldId"),
+                  RapidJsonUtil::SerializeValue(highest_field_id_, 
allocator).Move(), *allocator);
+    obj.AddMember(rapidjson::StringRef("partitionKeys"),
+                  RapidJsonUtil::SerializeValue(partition_keys_, 
allocator).Move(), *allocator);
+    obj.AddMember(rapidjson::StringRef("primaryKeys"),
+                  RapidJsonUtil::SerializeValue(primary_keys_, 
allocator).Move(), *allocator);
+    obj.AddMember(rapidjson::StringRef("options"),
+                  RapidJsonUtil::SerializeValue(options_, allocator).Move(), 
*allocator);
+    if (comment_) {
+        obj.AddMember(rapidjson::StringRef("comment"),
+                      RapidJsonUtil::SerializeValue(comment_, 
allocator).Move(), *allocator);
+    }
+    obj.AddMember(rapidjson::StringRef("timeMillis"),
+                  RapidJsonUtil::SerializeValue(time_millis_, 
allocator).Move(), *allocator);
+    return obj;
+}
+
+TableSchema::TableSchema(int32_t version, int64_t id, const 
std::vector<DataField>& fields,
+                         int32_t highest_field_id, const 
std::vector<std::string>& partition_keys,
+                         const std::vector<std::string>& primary_keys,
+                         const std::map<std::string, std::string>& options,
+                         const std::optional<std::string>& comment, int64_t 
time_millis)
+    : version_(version),
+      id_(id),
+      fields_(fields),
+      highest_field_id_(highest_field_id),
+      partition_keys_(partition_keys),
+      primary_keys_(primary_keys),
+      options_(options),
+      comment_(comment),
+      time_millis_(time_millis) {}
+
+bool TableSchema::operator==(const TableSchema& other) const {
+    return version_ == other.version_ && fields_ == other.fields_ &&
+           partition_keys_ == other.partition_keys_ && primary_keys_ == 
other.primary_keys_ &&
+           options_ == other.options_ && comment_ == other.comment_ &&
+           time_millis_ == other.time_millis_;
+}
+
+Result<std::unique_ptr<::ArrowSchema>> TableSchema::GetArrowSchema() const {
+    auto schema = DataField::ConvertDataFieldsToArrowSchema(fields_);
+    auto c_schema = std::make_unique<::ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
c_schema.get()));
+    return c_schema;
+}
+
+std::vector<std::string> TableSchema::FieldNames() const {
+    std::vector<std::string> field_names;
+    field_names.reserve(fields_.size());
+    std::transform(fields_.begin(), fields_.end(), 
std::back_inserter(field_names),
+                   [](const DataField& field) { return field.Name(); });
+    return field_names;
+}
+
+Result<FieldType> TableSchema::GetFieldType(const std::string& field_name) 
const {
+    PAIMON_ASSIGN_OR_RAISE(DataField field, GetField(field_name));
+    return FieldTypeUtils::ConvertToFieldType(field.Type()->id());
+}
+
+Result<DataField> TableSchema::GetField(const std::string& field_name) const {
+    for (const auto& field : Fields()) {
+        if (field.Name() == field_name) {
+            return field;
+        }
+    }
+    return Status::Invalid(
+        fmt::format("Get field {} failed: not exist in table schema", 
field_name));
+}
+
+Result<DataField> TableSchema::GetField(int32_t field_id) const {
+    for (const auto& field : Fields()) {
+        if (field.Id() == field_id) {
+            return field;
+        }
+    }
+    return Status::Invalid(
+        fmt::format("Get field with id {} failed: not exist in table schema", 
field_id));
+}
+
+Result<std::vector<DataField>> TableSchema::GetFields(
+    const std::vector<std::string>& field_names) const {
+    std::vector<DataField> data_fields;
+    data_fields.reserve(field_names.size());
+    for (const auto& name : field_names) {
+        PAIMON_ASSIGN_OR_RAISE(DataField field, GetField(name));
+        data_fields.emplace_back(field);
+    }
+    return data_fields;
+}
+
+Result<std::unique_ptr<TableSchema>> TableSchema::CreateFromJson(const 
std::string& json_str) {
+    PAIMON_ASSIGN_OR_RAISE(TableSchema table_schema, 
TableSchema::FromJsonString(json_str));
+    return InitSchema(table_schema.id_, table_schema.fields_, 
table_schema.highest_field_id_,
+                      table_schema.partition_keys_, table_schema.primary_keys_,
+                      table_schema.options_, table_schema.comment_, 
table_schema.time_millis_);
+}
+
+Result<std::unique_ptr<TableSchema>> TableSchema::InitSchema(
+    int64_t schema_id, const std::vector<DataField>& fields, int32_t 
highest_field_id,
+    const std::vector<std::string>& partition_keys, const 
std::vector<std::string>& primary_keys,
+    const std::map<std::string, std::string>& options, const 
std::optional<std::string>& comment,
+    int64_t time_millis) {
+    // validate schema first
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+    
PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema));
+
+    auto table_schema = std::unique_ptr<TableSchema>(
+        new TableSchema(TableSchema::CURRENT_VERSION, schema_id, fields, 
highest_field_id,
+                        partition_keys, primary_keys, options, comment, 
time_millis));
+    PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> keys, 
table_schema->TrimmedPrimaryKeys());
+
+    // Try to validate bucket keys
+    PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> bucket_keys,
+                           table_schema->OriginalBucketKeys());
+    if (bucket_keys.empty()) {
+        bucket_keys = keys;
+    }
+    table_schema->bucket_keys_ = bucket_keys;
+    PAIMON_ASSIGN_OR_RAISE(
+        table_schema->num_bucket_,
+        OptionsUtils::GetValueFromMap<int32_t>(table_schema->Options(), 
Options::BUCKET, -1));
+
+    return table_schema;
+}
+
+void TableSchema::FromJson(const rapidjson::Value& obj) noexcept(false) {
+    version_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, "version", 
PAIMON_07_VERSION);
+    id_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, "id");
+    fields_ = RapidJsonUtil::DeserializeKeyValue<std::vector<DataField>>(obj, 
"fields");
+    highest_field_id_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, 
"highestFieldId");
+    partition_keys_ =
+        RapidJsonUtil::DeserializeKeyValue<std::vector<std::string>>(obj, 
"partitionKeys");
+    primary_keys_ =
+        RapidJsonUtil::DeserializeKeyValue<std::vector<std::string>>(obj, 
"primaryKeys");
+    options_ =
+        RapidJsonUtil::DeserializeKeyValue<std::map<std::string, 
std::string>>(obj, "options");
+    if (version_ <= PAIMON_07_VERSION && options_.find(Options::BUCKET) == 
options_.end()) {
+        // the default value of BUCKET in old version is 1
+        options_[Options::BUCKET] = "1";
+    }
+    if (version_ <= PAIMON_08_VERSION && options_.find(Options::FILE_FORMAT) 
== options_.end()) {
+        // the default value of FILE_FORMAT in old version is orc
+        options_[Options::FILE_FORMAT] = "orc";
+    }
+    comment_ =
+        RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(obj, 
"comment", comment_);
+    time_millis_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, 
"timeMillis", 0);
+}
+
+Result<std::vector<std::string>> TableSchema::TrimmedPrimaryKeys() const {
+    if (primary_keys_.size() > 0) {
+        std::vector<std::string> result;
+        result.reserve(primary_keys_.size());
+        std::set<std::string> partition_keys_set(partition_keys_.begin(), 
partition_keys_.end());
+        for (const auto& pk : primary_keys_) {
+            if (partition_keys_set.find(pk) == partition_keys_set.end()) {
+                result.emplace_back(pk);
+            }
+        }
+        if (result.size() <= 0) {
+            return Status::Invalid(
+                fmt::format("Primary key constraint {} should not be same with 
partition "
+                            "fields {}, this will result in only one record in 
a partition",
+                            primary_keys_, partition_keys_));
+        }
+        return result;
+    }
+    return primary_keys_;
+}
+
+Result<std::vector<DataField>> TableSchema::TrimmedPrimaryKeyFields() const {
+    PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_primary_key, 
TrimmedPrimaryKeys());
+    return GetFields(trimmed_primary_key);
+}
+
+Result<std::shared_ptr<arrow::Schema>> TableSchema::TrimmedPrimaryKeySchema() 
const {
+    PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> pk_fields, 
TrimmedPrimaryKeyFields());
+    return DataField::ConvertDataFieldsToArrowSchema(pk_fields);
+}
+
+/// Original bucket keys, maybe empty.
+Result<std::vector<std::string>> TableSchema::OriginalBucketKeys() const {
+    std::vector<std::string> bucket_keys;
+    auto iter = options_.find(Options::BUCKET_KEY);
+    if (iter == options_.end()) {
+        return bucket_keys;
+    }
+    const auto& key = iter->second;
+    if (StringUtils::IsNullOrWhitespaceOnly(key)) {
+        return bucket_keys;
+    }
+    bucket_keys = StringUtils::Split(key, ",", /*ignore_empty=*/false);
+    if (!ObjectUtils::ContainsAll(FieldNames(), bucket_keys)) {
+        return Status::Invalid(fmt::format("Field names {} should contain all 
bucket keys {}.",
+                                           FieldNames(), bucket_keys));
+    }
+    bool any_match =
+        std::any_of(bucket_keys.begin(), bucket_keys.end(), [this](const 
std::string& key) {
+            return std::find(partition_keys_.begin(), partition_keys_.end(), 
key) !=
+                   partition_keys_.end();
+        });
+    if (any_match) {
+        return Status::Invalid(fmt::format("Bucket keys {} should not in 
partition keys {}.",
+                                           bucket_keys, partition_keys_));
+    }
+
+    if (primary_keys_.size() > 0) {
+        if (!ObjectUtils::ContainsAll(primary_keys_, bucket_keys)) {
+            return Status::Invalid(fmt::format("Primary keys {} should contain 
all bucket keys {}.",
+                                               primary_keys_, bucket_keys));
+        }
+    }
+    return bucket_keys;
+}
+
+bool TableSchema::CrossPartitionUpdate() const {
+    if (primary_keys_.empty() || partition_keys_.empty()) {
+        return false;
+    }
+
+    // If any partition key is not in primary keys, return true
+    return !ObjectUtils::ContainsAll(primary_keys_, partition_keys_);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/table_schema.h 
b/src/paimon/core/schema/table_schema.h
new file mode 100644
index 0000000..c878151
--- /dev/null
+++ b/src/paimon/core/schema/table_schema.h
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/result.h"
+#include "paimon/schema/schema.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+/// Schema of a table, including schemaId and fieldId.
+class TableSchema : public DataSchema, public Jsonizable<TableSchema> {
+ public:
+    static constexpr int64_t FIRST_SCHEMA_ID = 0;
+    static constexpr int32_t PAIMON_07_VERSION = 1;
+    static constexpr int32_t PAIMON_08_VERSION = 2;
+    static constexpr int32_t CURRENT_VERSION = 3;
+
+    static Result<std::unique_ptr<TableSchema>> Create(
+        int64_t schema_id, const std::shared_ptr<arrow::Schema>& schema,
+        const std::vector<std::string>& partition_keys,
+        const std::vector<std::string>& primary_keys,
+        const std::map<std::string, std::string>& options);
+
+    static Result<std::unique_ptr<TableSchema>> CreateFromJson(const 
std::string& json_str);
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override;
+
+    void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+    bool operator==(const TableSchema& other) const;
+
+    Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const override;
+
+    Result<std::string> GetJsonSchema() const override {
+        return ToJsonString();
+    }
+
+    std::vector<std::string> FieldNames() const override;
+
+    Result<FieldType> GetFieldType(const std::string& field_name) const 
override;
+
+    int64_t Id() const override {
+        return id_;
+    }
+    const std::vector<std::string>& PrimaryKeys() const override {
+        return primary_keys_;
+    }
+    const std::vector<std::string>& PartitionKeys() const override {
+        return partition_keys_;
+    }
+
+    const std::vector<std::string>& BucketKeys() const override {
+        return bucket_keys_;
+    }
+
+    int32_t NumBuckets() const override {
+        return num_bucket_;
+    }
+    int32_t HighestFieldId() const override {
+        return highest_field_id_;
+    }
+    const std::map<std::string, std::string>& Options() const override {
+        return options_;
+    }
+    const std::vector<DataField>& Fields() const {
+        return fields_;
+    }
+
+    Result<DataField> GetField(const std::string& field_name) const;
+
+    Result<DataField> GetField(int32_t field_id) const;
+
+    Result<std::vector<DataField>> GetFields(const std::vector<std::string>& 
field_names) const;
+    Result<std::vector<std::string>> TrimmedPrimaryKeys() const;
+    Result<std::vector<DataField>> TrimmedPrimaryKeyFields() const;
+    Result<std::shared_ptr<arrow::Schema>> TrimmedPrimaryKeySchema() const;
+
+    std::optional<std::string> Comment() const override {
+        return comment_;
+    }
+
+    bool CrossPartitionUpdate() const;
+
+ private:
+    JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(TableSchema);
+
+    static Result<std::unique_ptr<TableSchema>> InitSchema(
+        int64_t schema_id, const std::vector<DataField>& fields, int32_t 
highest_field_id,
+        const std::vector<std::string>& partition_keys,
+        const std::vector<std::string>& primary_keys,
+        const std::map<std::string, std::string>& options,
+        const std::optional<std::string>& comment, int64_t time_millis);
+
+    TableSchema(int32_t version, int64_t id, const std::vector<DataField>& 
fields,
+                int32_t highest_field_id, const std::vector<std::string>& 
partition_keys,
+                const std::vector<std::string>& primary_keys,
+                const std::map<std::string, std::string>& options,
+                const std::optional<std::string>& comment, int64_t 
time_millis);
+
+    Result<std::vector<std::string>> OriginalBucketKeys() const;
+
+    static Result<std::shared_ptr<arrow::Field>> AssignFieldIdsRecursively(
+        const std::shared_ptr<arrow::Field>& field, bool set_field_id, 
int32_t* field_id);
+
+    static Result<std::shared_ptr<arrow::KeyValueMetadata>> 
MakeMetaDataWithFieldId(
+        const std::shared_ptr<arrow::Field>& field, int32_t field_id);
+
+ private:
+    // version of schema for paimon
+    int32_t version_ = -1;
+    int64_t id_ = -1;
+    std::vector<DataField> fields_;
+    /// Not available from fields, as some fields may have been deleted.
+    int32_t highest_field_id_ = -1;
+    std::vector<std::string> partition_keys_;
+    std::vector<std::string> primary_keys_;
+    std::vector<std::string> bucket_keys_;
+    int32_t num_bucket_ = -1;
+    std::map<std::string, std::string> options_;
+    std::optional<std::string> comment_;
+    int64_t time_millis_ = -1;
+};
+}  // namespace paimon
diff --git a/src/paimon/core/schema/table_schema_test.cpp 
b/src/paimon/core/schema/table_schema_test.cpp
new file mode 100644
index 0000000..dedb3bc
--- /dev/null
+++ b/src/paimon/core/schema/table_schema_test.cpp
@@ -0,0 +1,1301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/table_schema.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class TableSchemaTest : public ::testing::Test {
+ public:
+    arrow::FieldVector MakeArrowField(int32_t id) const {
+        arrow::FieldVector arrow_fields;
+        arrow_fields.reserve(5);
+        auto meta =
+            arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)}, 
{std::to_string(id)});
+        arrow_fields.push_back(std::make_shared<arrow::Field>("sub1", 
arrow::date32(),
+                                                              
/*nullable=*/true, meta));
+        meta = 
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+                                             {std::to_string(id + 1)});
+        arrow_fields.push_back(
+            std::make_shared<arrow::Field>("sub2", 
arrow::timestamp(arrow::TimeUnit::type::NANO),
+                                           /*nullable=*/true, meta));
+        meta = 
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+                                             {std::to_string(id + 2)});
+        arrow_fields.push_back(std::make_shared<arrow::Field>("sub3", 
arrow::decimal128(23, 5),
+                                                              
/*nullable=*/true, meta));
+        meta = 
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+                                             {std::to_string(id + 3)});
+        arrow_fields.push_back(std::make_shared<arrow::Field>("sub4", 
arrow::binary(),
+                                                              
/*nullable=*/true, meta));
+        meta = 
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+                                             {std::to_string(id + 4)});
+        arrow_fields.push_back(std::make_shared<arrow::Field>("sub5", 
arrow::binary(),
+                                                              
/*nullable=*/true, meta));
+        return arrow_fields;
+    }
+
+    std::string ReplaceAll(const std::string& str) {
+        std::string replaced_str = StringUtils::Replace(str, " ", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+        return replaced_str;
+    }
+};
+
+TEST_F(TableSchemaTest, TestCreateWithAllFieldsNotHaveFieldId) {
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::boolean()),  arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int8()),     arrow::field("f3", 
arrow::int16()),
+        arrow::field("f4", arrow::int16()),    arrow::field("f5", 
arrow::int32()),
+        arrow::field("f6", arrow::int32()),    arrow::field("f7", 
arrow::int64()),
+        arrow::field("f8", arrow::int64()),    arrow::field("f9", 
arrow::float32()),
+        arrow::field("f10", arrow::float64()), arrow::field("f11", 
arrow::utf8()),
+        arrow::field("f12", arrow::binary()),  
arrow::field("non-partition-field", arrow::int32())};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"f1", "f2"};
+    std::vector<std::string> primary_keys = {"f3", "f4"};
+    std::map<std::string, std::string> options;
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_TRUE(table_schema);
+
+    std::vector<DataField> expected_data_fields = {
+        DataField(0, arrow::field("f0", arrow::boolean())),
+        DataField(1, arrow::field("f1", arrow::int8())),
+        DataField(2, arrow::field("f2", arrow::int8())),
+        DataField(3, arrow::field("f3", arrow::int16(), /*nullable=*/false)),
+        DataField(4, arrow::field("f4", arrow::int16(), /*nullable=*/false)),
+        DataField(5, arrow::field("f5", arrow::int32())),
+        DataField(6, arrow::field("f6", arrow::int32())),
+        DataField(7, arrow::field("f7", arrow::int64())),
+        DataField(8, arrow::field("f8", arrow::int64())),
+        DataField(9, arrow::field("f9", arrow::float32())),
+        DataField(10, arrow::field("f10", arrow::float64())),
+        DataField(11, arrow::field("f11", arrow::utf8())),
+        DataField(12, arrow::field("f12", arrow::binary())),
+        DataField(13, arrow::field("non-partition-field", arrow::int32()))};
+
+    ASSERT_EQ(table_schema->Fields(), expected_data_fields);
+    ASSERT_EQ(table_schema->Id(), 0);
+    ASSERT_EQ(table_schema->HighestFieldId(), 13);
+    ASSERT_EQ(table_schema->PrimaryKeys(), primary_keys);
+    ASSERT_EQ(table_schema->PartitionKeys(), partition_keys);
+    ASSERT_EQ(table_schema->Options(), options);
+}
+
+TEST_F(TableSchemaTest, TestCreateWithAllFieldsHaveFieldId) {
+    arrow::FieldVector fields = MakeArrowField(0);
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"sub1", "sub2"};
+    std::vector<std::string> primary_keys = {"sub3", "sub4"};
+    std::map<std::string, std::string> options;
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_TRUE(table_schema);
+
+    std::vector<DataField> expected_data_fields = {
+        DataField(0, arrow::field("sub1", arrow::date32())),
+        DataField(1, arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::type::NANO))),
+        DataField(2, arrow::field("sub3", arrow::decimal128(23, 5), 
/*nullable=*/false)),
+        DataField(3, arrow::field("sub4", arrow::binary(), 
/*nullable=*/false)),
+        DataField(4, arrow::field("sub5", arrow::binary()))};
+    ASSERT_EQ(table_schema->Fields(), expected_data_fields);
+    ASSERT_EQ(table_schema->Id(), 0);
+    ASSERT_EQ(table_schema->HighestFieldId(), 4);
+    ASSERT_EQ(table_schema->PrimaryKeys(), primary_keys);
+    ASSERT_EQ(table_schema->PartitionKeys(), partition_keys);
+    ASSERT_EQ(table_schema->Options(), options);
+}
+
+TEST_F(TableSchemaTest, TestInvalidCreate) {
+    // partial fields have field id
+    arrow::FieldVector fields = MakeArrowField(3);
+    fields.push_back(arrow::field("f0", arrow::boolean()));
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"sub1", "sub2"};
+    std::vector<std::string> primary_keys = {"sub3", "sub4"};
+    std::map<std::string, std::string> options;
+    ASSERT_NOK(TableSchema::Create(/*schema_id=*/1, schema, partition_keys, 
primary_keys, options));
+}
+
+TEST_F(TableSchemaTest, TestDeserializeAppendTableSchema) {
+    auto fs = std::make_shared<LocalFileSystem>();
+
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09/schema/schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+                         TableSchema::CreateFromJson(content));
+    TableSchema expected_schema;
+    expected_schema.version_ = 3;
+    expected_schema.id_ = 0;
+
+    auto field1 = DataField(0, arrow::field("f0", arrow::utf8()));
+    auto field2 = DataField(1, arrow::field("f1", arrow::int32()));
+    auto field3 = DataField(2, arrow::field("f2", arrow::int32()));
+    auto field4 = DataField(3, arrow::field("f3", arrow::float64()));
+    expected_schema.fields_ = {field1, field2, field3, field4};
+
+    expected_schema.highest_field_id_ = 3;
+    expected_schema.partition_keys_ = {"f1"};
+    expected_schema.primary_keys_ = {};
+    expected_schema.options_ = {
+        {"bucket", "2"}, {"bucket-key", "f2"}, {"manifest.format", "orc"}, 
{"file.format", "orc"}};
+    expected_schema.time_millis_ = 1721614341162;
+
+    ASSERT_EQ(*result_schema, expected_schema);
+
+    ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2", "f3"}), 
result_schema->FieldNames());
+    ASSERT_EQ(field2, result_schema->GetField("f1").value());
+    ASSERT_EQ(field2, result_schema->GetField(1).value());
+    ASSERT_NOK_WITH_MSG(result_schema->GetField("non-exist"),
+                        "Get field non-exist failed: not exist in table 
schema");
+    ASSERT_NOK_WITH_MSG(result_schema->GetField(100),
+                        "Get field with id 100 failed: not exist in table 
schema");
+}
+
+TEST_F(TableSchemaTest, TestDeserializePkTableSchema) {
+    auto fs = std::make_shared<LocalFileSystem>();
+
+    std::string path = paimon::test::GetDataDir() + 
"/orc/pk_09.db/pk_09/schema/schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+                         TableSchema::CreateFromJson(content));
+    TableSchema expected_schema;
+    expected_schema.version_ = 3;
+    expected_schema.id_ = 0;
+
+    auto field1 = DataField(0, arrow::field("f0", arrow::utf8(), 
/*nullable=*/false));
+    auto field2 = DataField(1, arrow::field("f1", arrow::int32(), 
/*nullable=*/false));
+    auto field3 = DataField(2, arrow::field("f2", arrow::int32(), 
/*nullable=*/false));
+    auto field4 = DataField(3, arrow::field("f3", arrow::float64(), 
/*nullable=*/true));
+    expected_schema.fields_ = {field1, field2, field3, field4};
+
+    expected_schema.highest_field_id_ = 3;
+    expected_schema.partition_keys_ = {"f1"};
+    expected_schema.primary_keys_ = {"f0", "f1", "f2"};
+    expected_schema.options_ = {{"bucket", "2"},
+                                {"bucket-key", "f2"},
+                                {"manifest.format", "orc"},
+                                {"file.format", "orc"},
+                                {"deletion-vectors.enabled", "true"},
+                                {"commit.force-compact", "true"}};
+    expected_schema.time_millis_ = 1725534144802;
+
+    ASSERT_EQ(*result_schema, expected_schema);
+    ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2", "f3"}), 
result_schema->FieldNames());
+    ASSERT_OK_AND_ASSIGN(auto trimmed_primary_keys, 
result_schema->TrimmedPrimaryKeys());
+    ASSERT_EQ(trimmed_primary_keys, std::vector<std::string>({"f0", "f2"}));
+    ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+                         result_schema->GetFields(trimmed_primary_keys));
+    ASSERT_EQ(2, trimmed_primary_key_fields.size());
+    ASSERT_EQ(trimmed_primary_key_fields[0], field1);
+    ASSERT_EQ(trimmed_primary_key_fields[1], field3);
+}
+
+TEST_F(TableSchemaTest, TestTrimmedPrimaryKeyInterfaces) {
+    arrow::FieldVector fields = {
+        arrow::field("pt", arrow::utf8()),
+        arrow::field("id", arrow::int64()),
+        arrow::field("val", arrow::int32()),
+    };
+    auto schema = arrow::schema(fields);
+
+    ASSERT_OK_AND_ASSIGN(auto table_schema, 
TableSchema::Create(/*schema_id=*/0, schema,
+                                                                
/*partition_keys=*/{"pt"},
+                                                                
/*primary_keys=*/{"pt", "id"},
+                                                                
/*options=*/{}));
+
+    ASSERT_OK_AND_ASSIGN(std::vector<std::string> trimmed_primary_keys,
+                         table_schema->TrimmedPrimaryKeys());
+    ASSERT_EQ(trimmed_primary_keys, std::vector<std::string>({"id"}));
+
+    ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+                         table_schema->TrimmedPrimaryKeyFields());
+    ASSERT_EQ(trimmed_primary_key_fields.size(), 1);
+    ASSERT_EQ(trimmed_primary_key_fields[0].Name(), "id");
+    ASSERT_FALSE(trimmed_primary_key_fields[0].Nullable());
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Schema> 
trimmed_primary_key_schema,
+                         table_schema->TrimmedPrimaryKeySchema());
+    ASSERT_EQ(trimmed_primary_key_schema->num_fields(), 1);
+    ASSERT_EQ(trimmed_primary_key_schema->field(0)->name(), "id");
+    ASSERT_FALSE(trimmed_primary_key_schema->field(0)->nullable());
+}
+
+TEST_F(TableSchemaTest, TestTrimmedPrimaryKeyInterfacesWithoutPrimaryKeys) {
+    arrow::FieldVector fields = {
+        arrow::field("pt", arrow::utf8()),
+        arrow::field("val", arrow::int32()),
+    };
+    auto schema = arrow::schema(fields);
+
+    ASSERT_OK_AND_ASSIGN(auto table_schema, 
TableSchema::Create(/*schema_id=*/0, schema,
+                                                                
/*partition_keys=*/{"pt"},
+                                                                
/*primary_keys=*/{},
+                                                                
/*options=*/{}));
+
+    ASSERT_OK_AND_ASSIGN(std::vector<std::string> trimmed_primary_keys,
+                         table_schema->TrimmedPrimaryKeys());
+    ASSERT_TRUE(trimmed_primary_keys.empty());
+
+    ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+                         table_schema->TrimmedPrimaryKeyFields());
+    ASSERT_TRUE(trimmed_primary_key_fields.empty());
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Schema> 
trimmed_primary_key_schema,
+                         table_schema->TrimmedPrimaryKeySchema());
+    ASSERT_EQ(trimmed_primary_key_schema->num_fields(), 0);
+}
+
+TEST_F(TableSchemaTest, TestDeserializeAppendTableSchemaWithTimestamp) {
+    auto fs = std::make_shared<LocalFileSystem>();
+
+    std::string path = paimon::test::GetDataDir() +
+                       
"/orc/append_with_multiple_ts_precision_and_timezone.db/"
+                       
"append_with_multiple_ts_precision_and_timezone/schema/schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+                         TableSchema::CreateFromJson(content));
+    TableSchema expected_schema;
+    expected_schema.version_ = 3;
+    expected_schema.id_ = 0;
+
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    std::vector<DataField> fields = {
+        DataField(0, arrow::field("ts_sec", 
arrow::timestamp(arrow::TimeUnit::SECOND))),
+        DataField(1, arrow::field("ts_milli", 
arrow::timestamp(arrow::TimeUnit::MILLI))),
+        DataField(2, arrow::field("ts_micro", 
arrow::timestamp(arrow::TimeUnit::MICRO))),
+        DataField(3, arrow::field("ts_nano", 
arrow::timestamp(arrow::TimeUnit::NANO))),
+        DataField(4,
+                  arrow::field("ts_tz_sec", 
arrow::timestamp(arrow::TimeUnit::SECOND, timezone))),
+        DataField(5,
+                  arrow::field("ts_tz_milli", 
arrow::timestamp(arrow::TimeUnit::MILLI, timezone))),
+        DataField(6,
+                  arrow::field("ts_tz_micro", 
arrow::timestamp(arrow::TimeUnit::MICRO, timezone))),
+        DataField(7,
+                  arrow::field("ts_tz_nano", 
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+    expected_schema.fields_ = fields;
+
+    expected_schema.highest_field_id_ = 7;
+    expected_schema.partition_keys_ = {};
+    expected_schema.primary_keys_ = {};
+    expected_schema.options_ = {{"bucket", "-1"},
+                                {"manifest.format", "orc"},
+                                {"file.format", "orc"},
+                                {"orc.timestamp-ltz.legacy.type", "false"}};
+    expected_schema.time_millis_ = 1757927622210;
+
+    ASSERT_EQ(*result_schema, expected_schema);
+
+    ASSERT_EQ(std::vector<std::string>({"ts_sec", "ts_milli", "ts_micro", 
"ts_nano", "ts_tz_sec",
+                                        "ts_tz_milli", "ts_tz_micro", 
"ts_tz_nano"}),
+              result_schema->FieldNames());
+}
+
+TEST_F(TableSchemaTest, TestDeserializeWithNestedDataType) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string path = paimon::test::GetDataDir() +
+                       
"/orc/append_table_with_nested_type.db/append_table_with_nested_type/schema/"
+                       "schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+                         TableSchema::CreateFromJson(content));
+    TableSchema expected_schema;
+    expected_schema.version_ = 3;
+    expected_schema.id_ = 0;
+
+    auto field1 = DataField(0, arrow::field("f0", 
arrow::struct_(MakeArrowField(1))));
+    auto field2 = DataField(6, arrow::field("f1", 
arrow::list(arrow::struct_(MakeArrowField(7)))));
+    auto field3 = DataField(12, arrow::field("f2", 
arrow::map(arrow::struct_(MakeArrowField(13)),
+                                                              
arrow::struct_(MakeArrowField(18)))));
+    expected_schema.fields_ = {field1, field2, field3};
+
+    expected_schema.highest_field_id_ = 22;
+    expected_schema.partition_keys_ = {};
+    expected_schema.primary_keys_ = {};
+    expected_schema.options_ = {{"manifest.format", "orc"}, {"file.format", 
"orc"}};
+    expected_schema.time_millis_ = 1729759141146;
+
+    ASSERT_EQ(*result_schema, expected_schema);
+    ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2"}), 
result_schema->FieldNames());
+}
+
+TEST_F(TableSchemaTest, TestEmptyBucketKey) {
+    // empty bucket key, will use trimmed pk as bucket key
+    {
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "bucket-key" : "",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+                             TableSchema::CreateFromJson(table_schema_str));
+        ASSERT_EQ(std::vector<std::string>({"f2"}), 
schema_result->bucket_keys_);
+    }
+    {
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+                             TableSchema::CreateFromJson(table_schema_str));
+        ASSERT_EQ(std::vector<std::string>({"f2"}), 
schema_result->bucket_keys_);
+    }
+}
+
+TEST_F(TableSchemaTest, TestToJson) {
+    auto field1 = DataField(0, arrow::field("f0", arrow::utf8()));
+    auto field2 = DataField(1, arrow::field("f1", arrow::int32()));
+    auto field3 = DataField(2, arrow::field("f2", arrow::int32()));
+    auto field4 = DataField(3, arrow::field("f3", arrow::float64()));
+    auto data_fields = {field1, field2, field3, field4};
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+    ASSERT_OK_AND_ASSIGN(
+        auto schema, TableSchema::Create(/*schema_id=*/0, arrow_schema, 
/*partition_keys=*/{"f1"},
+                                         /*primary_keys=*/{},
+                                         {{"manifest.format", "orc"}, 
{"file.format", "orc"}}));
+    schema->time_millis_ = 1721614341162;
+    ASSERT_OK_AND_ASSIGN(std::string json_str, schema->ToJsonString());
+    std::string table_schema_str = R"({
+    "version": 3,
+    "id": 0,
+    "fields": [
+        {
+            "id": 0,
+            "name": "f0",
+            "type": "STRING"
+        },
+        {
+            "id": 1,
+            "name": "f1",
+            "type": "INT"
+        },
+        {
+            "id": 2,
+            "name": "f2",
+            "type": "INT"
+        },
+        {
+            "id": 3,
+            "name": "f3",
+            "type": "DOUBLE"
+        }
+    ],
+    "highestFieldId": 3,
+    "partitionKeys": [
+        "f1"
+    ],
+    "primaryKeys": [],
+    "options": {
+        "file.format": "orc",
+        "manifest.format": "orc"
+    },
+    "timeMillis": 1721614341162
+})";
+    ASSERT_EQ(ReplaceAll(table_schema_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJson2) {
+    std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "file.format" : "orc",
+                "manifest.format" : "orc"
+        },
+        "comment" : "this is a comment",
+        "timeMillis" : 1721614341162
+    })";
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+                         TableSchema::CreateFromJson(table_schema_str));
+    ASSERT_OK_AND_ASSIGN(std::string json_str, schema_result->ToJsonString());
+    ASSERT_EQ(ReplaceAll(table_schema_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJson3) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string path =
+        paimon::test::GetDataDir() +
+        
"/orc/append_complex_build_in_fieldid.db/append_complex_build_in_fieldid/schema/"
+        "schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> st, 
TableSchema::CreateFromJson(content));
+    ASSERT_OK_AND_ASSIGN(std::string json_str, st->ToJsonString());
+    std::string expected_str = R"==({
+    "version": 3,
+    "id": 0,
+    "fields": [
+        {
+            "id": 0,
+            "name": "f1",
+            "type": {
+                "type": "MAP",
+                "key": "TINYINT NOT NULL",
+                "value": "SMALLINT"
+            }
+        },
+        {
+            "id": 1,
+            "name": "f2",
+            "type": {
+                "type": "ARRAY",
+                "element": "FLOAT"
+            }
+        },
+        {
+            "id": 2,
+            "name": "f3",
+            "type": {
+                "type": "ROW",
+                "fields": [
+                    {
+                        "id": 3,
+                        "name": "f0",
+                        "type": "BOOLEAN"
+                    },
+                    {
+                        "id": 4,
+                        "name": "f1",
+                        "type": "BIGINT"
+                    }
+                ]
+            }
+        },
+        {
+            "id": 5,
+            "name": "f4",
+            "type": "TIMESTAMP(9)"
+        },
+        {
+            "id": 6,
+            "name": "f5",
+            "type": "DATE"
+        },
+        {
+            "id": 7,
+            "name": "f6",
+            "type": "DECIMAL(2, 2)"
+        }
+    ],
+    "highestFieldId": 7,
+    "partitionKeys": [],
+    "primaryKeys": [],
+    "options": {
+        "file.format": "orc",
+        "manifest.format": "orc"
+    },
+    "timeMillis": 1732079677062
+})==";
+    ASSERT_EQ(ReplaceAll(expected_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJsonWithNestedField0) {
+    auto map_type = arrow::map(arrow::int8(), arrow::int16());
+    auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField(
+        DataField(536871936, arrow::field("item", arrow::float32()))));
+    std::vector<DataField> struct_fields = {DataField(3, arrow::field("f0", 
arrow::boolean())),
+                                            DataField(4, arrow::field("f1", 
arrow::int64()))};
+    auto struct_type = 
DataField::ConvertDataFieldsToArrowStructType(struct_fields);
+    auto data_fields = {DataField(0, arrow::field("f1", map_type)),
+                        DataField(1, arrow::field("f2", list_type)),
+                        DataField(2, arrow::field("f3", struct_type)),
+                        DataField(5, arrow::field("f4", 
arrow::timestamp(arrow::TimeUnit::NANO))),
+                        DataField(6, arrow::field("f5", arrow::date32())),
+                        DataField(7, arrow::field("f6", arrow::decimal128(2, 
2)))};
+
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+    ASSERT_OK_AND_ASSIGN(auto schema,
+                         TableSchema::Create(/*schema_id=*/0, arrow_schema, 
/*partition_keys=*/{},
+                                             /*primary_keys=*/{},
+                                             {{"manifest.format", "orc"}, 
{"file.format", "orc"}}));
+
+    schema->time_millis_ = 1721614341162;
+    ASSERT_OK_AND_ASSIGN(std::string json_str, schema->ToJsonString());
+    std::string expected_schema = R"==({
+    "version": 3,
+    "id": 0,
+    "fields": [
+        {
+            "id": 0,
+            "name": "f1",
+            "type": {
+                "type": "MAP",
+                "key": "TINYINT NOT NULL",
+                "value": "SMALLINT"
+            }
+        },
+        {
+            "id": 1,
+            "name": "f2",
+            "type": {
+                "type": "ARRAY",
+                "element": "FLOAT"
+            }
+        },
+        {
+            "id": 2,
+            "name": "f3",
+            "type": {
+                "type": "ROW",
+                "fields": [
+                    {
+                        "id": 3,
+                        "name": "f0",
+                        "type": "BOOLEAN"
+                    },
+                    {
+                        "id": 4,
+                        "name": "f1",
+                        "type": "BIGINT"
+                    }
+                ]
+            }
+        },
+        {
+            "id": 5,
+            "name": "f4",
+            "type": "TIMESTAMP(9)"
+        },
+        {
+            "id": 6,
+            "name": "f5",
+            "type": "DATE"
+        },
+        {
+            "id": 7,
+            "name": "f6",
+            "type": "DECIMAL(2, 2)"
+        }
+    ],
+    "highestFieldId": 7,
+    "partitionKeys": [],
+    "primaryKeys": [],
+    "options": {
+        "file.format": "orc",
+        "manifest.format": "orc"
+    },
+    "timeMillis": 1721614341162
+})==";
+    ASSERT_EQ(ReplaceAll(expected_schema), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJsonWithNestedField1) {
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string path = paimon::test::GetDataDir() +
+                       
"/orc/append_table_with_nested_type.db/append_table_with_nested_type/schema/"
+                       "schema-0";
+    std::string content;
+    ASSERT_OK(fs->ReadFile(path, &content));
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> st, 
TableSchema::CreateFromJson(content));
+    ASSERT_OK_AND_ASSIGN(std::string json_str, st->ToJsonString());
+    std::string expected_str = R"==({
+    "version": 3,
+    "id": 0,
+    "fields": [
+        {
+            "id": 0,
+            "name": "f0",
+            "type": {
+                "type": "ROW",
+                "fields": [
+                    {
+                        "id": 1,
+                        "name": "sub1",
+                        "type": "DATE"
+                    },
+                    {
+                        "id": 2,
+                        "name": "sub2",
+                        "type": "TIMESTAMP(9)"
+                    },
+                    {
+                        "id": 3,
+                        "name": "sub3",
+                        "type": "DECIMAL(23, 5)"
+                    },
+                    {
+                        "id": 4,
+                        "name": "sub4",
+                        "type": "BYTES"
+                    },
+                    {
+                        "id": 5,
+                        "name": "sub5",
+                        "type": "BYTES"
+                    }
+                ]
+            }
+        },
+        {
+            "id": 6,
+            "name": "f1",
+            "type": {
+                "type": "ARRAY",
+                "element": {
+                    "type": "ROW",
+                    "fields": [
+                        {
+                            "id": 7,
+                            "name": "sub1",
+                            "type": "DATE"
+                        },
+                        {
+                            "id": 8,
+                            "name": "sub2",
+                            "type": "TIMESTAMP(9)"
+                        },
+                        {
+                            "id": 9,
+                            "name": "sub3",
+                            "type": "DECIMAL(23, 5)"
+                        },
+                        {
+                            "id": 10,
+                            "name": "sub4",
+                            "type": "BYTES"
+                        },
+                        {
+                            "id": 11,
+                            "name": "sub5",
+                            "type": "BYTES"
+                        }
+                    ]
+                }
+            }
+        },
+        {
+            "id": 12,
+            "name": "f2",
+            "type": {
+                "type": "MAP",
+                "key": {
+                    "type": "ROW NOT NULL",
+                    "fields": [
+                        {
+                            "id": 13,
+                            "name": "sub1",
+                            "type": "DATE"
+                        },
+                        {
+                            "id": 14,
+                            "name": "sub2",
+                            "type": "TIMESTAMP(9)"
+                        },
+                        {
+                            "id": 15,
+                            "name": "sub3",
+                            "type": "DECIMAL(23, 5)"
+                        },
+                        {
+                            "id": 16,
+                            "name": "sub4",
+                            "type": "BYTES"
+                        },
+                        {
+                            "id": 17,
+                            "name": "sub5",
+                            "type": "BYTES"
+                        }
+                    ]
+                },
+                "value": {
+                    "type": "ROW",
+                    "fields": [
+                        {
+                            "id": 18,
+                            "name": "sub1",
+                            "type": "DATE"
+                        },
+                        {
+                            "id": 19,
+                            "name": "sub2",
+                            "type": "TIMESTAMP(9)"
+                        },
+                        {
+                            "id": 20,
+                            "name": "sub3",
+                            "type": "DECIMAL(23, 5)"
+                        },
+                        {
+                            "id": 21,
+                            "name": "sub4",
+                            "type": "BYTES"
+                        },
+                        {
+                            "id": 22,
+                            "name": "sub5",
+                            "type": "BYTES"
+                        }
+                    ]
+                }
+            }
+        }
+    ],
+    "highestFieldId": 22,
+    "partitionKeys": [],
+    "primaryKeys": [],
+    "options": {
+        "file.format": "orc",
+        "manifest.format": "orc"
+    },
+    "timeMillis": 1729759141146
+})==";
+
+    ASSERT_EQ(ReplaceAll(expected_str), ReplaceAll(json_str));
+    // test create from arrow schema, test set field id
+    auto f0 = arrow::field(
+        "f0", arrow::struct_({arrow::field("sub1", arrow::date32()),
+                              arrow::field("sub2", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                              arrow::field("sub3", arrow::decimal128(23, 5)),
+                              arrow::field("sub4", arrow::binary()),
+                              arrow::field("sub5", arrow::binary())}));
+    auto f1 = arrow::field("f1", arrow::list(f0));
+    auto f2 = arrow::field("f2", arrow::map(f0->type(), f0->type()));
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto arrow_schema = arrow::schema(fields);
+    ASSERT_OK_AND_ASSIGN(
+        auto new_table_schema,
+        TableSchema::Create(0, arrow_schema, /*partition_keys=*/{}, 
/*primary_keys=*/{},
+                            /*options=*/{{"file.format", "orc"}, 
{"manifest.format", "orc"}}));
+    new_table_schema->time_millis_ = 1729759141146;
+    ASSERT_OK_AND_ASSIGN(std::string new_json_str, 
new_table_schema->ToJsonString());
+    ASSERT_EQ(expected_str, new_json_str) << new_json_str;
+}
+
+TEST_F(TableSchemaTest, TestInvalidSchema) {
+    {
+        // partition key same as primary key
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1", "f2" ],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "bucket-key" : "f2",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+                            "this will result in only one record in a 
partition");
+    }
+    {
+        // bucket key is non-exist in fields
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "bucket-key" : "non-exist",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+                            "should contain all bucket keys");
+    }
+    {
+        // bucket key in partition key
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "bucket-key" : "f1,f3",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+                            "should not in partition keys");
+    }
+    {
+        // bucket key is not a subset of primary key
+        std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : "STRING"
+        }, {
+                "id" : 1,
+                "name" : "f1",
+                "type" : "INT"
+        }, {
+                "id" : 2,
+                "name" : "f2",
+                "type" : "INT"
+        }, {
+                "id" : 3,
+                "name" : "f3",
+                "type" : "DOUBLE"
+        } ],
+        "highestFieldId" : 3,
+        "partitionKeys" : [ "f1"],
+        "primaryKeys" : [ "f1", "f2" ],
+        "options" : {
+                "bucket" : "2",
+                "bucket-key" : "f3",
+                "manifest.format" : "orc",
+                "file.format" : "orc"
+        },
+        "timeMillis" : 1721614341162
+    })";
+        ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+                            "should contain all bucket keys");
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdBasicType) {
+    auto field = arrow::field("column1", arrow::int32());
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(field, 
/*set_field_id=*/true, &field_id));
+        ASSERT_TRUE(new_field->metadata());
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        ASSERT_EQ(field_id, 1);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(field, 
/*set_field_id=*/false, &field_id));
+        ASSERT_FALSE(new_field->metadata());
+        ASSERT_EQ(field_id, 0);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdStructType) {
+    auto child1 = arrow::field("child1", arrow::int32());
+    auto child2 = arrow::field("child2", arrow::float64());
+    auto struct_field = arrow::field("parent", arrow::struct_({child1, 
child2}));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(struct_field, 
/*set_field_id=*/true, &field_id));
+        auto struct_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        ASSERT_EQ(struct_type->num_fields(), 2);
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        
ASSERT_EQ(struct_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "2");
+        ASSERT_EQ(field_id, 3);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+                             TableSchema::AssignFieldIdsRecursively(
+                                 struct_field, /*set_field_id=*/false, 
&field_id));
+        auto struct_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        ASSERT_EQ(struct_type->num_fields(), 2);
+        ASSERT_FALSE(new_field->metadata());
+        
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "0");
+        
ASSERT_EQ(struct_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        ASSERT_EQ(field_id, 2);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdListType) {
+    auto value_field = arrow::field("values", arrow::int32());
+    auto list_field = arrow::field("list_column", arrow::list(value_field));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(list_field, 
/*set_field_id=*/true, &field_id));
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        auto list_type = 
arrow::internal::checked_pointer_cast<arrow::ListType>(new_field->type());
+        ASSERT_FALSE(list_type->value_field()->metadata());
+        ASSERT_EQ(field_id, 1);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(list_field, 
/*set_field_id=*/false, &field_id));
+        ASSERT_FALSE(new_field->metadata());
+        auto list_type = 
arrow::internal::checked_pointer_cast<arrow::ListType>(new_field->type());
+        ASSERT_FALSE(list_type->value_field()->metadata());
+        ASSERT_EQ(field_id, 0);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdMapType) {
+    auto map_field = arrow::field("map_column", arrow::map(arrow::utf8(), 
arrow::int32()));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(map_field, 
/*set_field_id=*/true, &field_id));
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        auto map_type = 
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+        std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+        std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+        ASSERT_FALSE(key_field->metadata());
+        ASSERT_FALSE(value_field->metadata());
+        ASSERT_EQ(field_id, 1);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(map_field, 
/*set_field_id=*/false, &field_id));
+        ASSERT_FALSE(new_field->metadata());
+        auto map_type = 
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+        std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+        std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+        ASSERT_FALSE(key_field->metadata());
+        ASSERT_FALSE(value_field->metadata());
+        ASSERT_EQ(field_id, 0);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdMapWithStruct) {
+    auto inner_child1 = arrow::field("inner1", arrow::int32());
+    auto inner_child2 = arrow::field("inner2", arrow::float64());
+    auto map_field =
+        arrow::field("map_column", arrow::map(arrow::struct_({inner_child1, 
inner_child2}),
+                                              arrow::struct_({inner_child1, 
inner_child2})));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(map_field, 
/*set_field_id=*/true, &field_id));
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        auto map_type = 
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+        std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+        std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+        ASSERT_FALSE(key_field->metadata());
+        auto key_inner_type = 
std::static_pointer_cast<arrow::StructType>(key_field->type());
+        
ASSERT_EQ(key_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        
ASSERT_EQ(key_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "2");
+        ASSERT_FALSE(value_field->metadata());
+        auto value_inner_type = 
std::static_pointer_cast<arrow::StructType>(value_field->type());
+        
ASSERT_EQ(value_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+                  "3");
+        
ASSERT_EQ(value_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+                  "4");
+        ASSERT_EQ(field_id, 5);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(map_field, 
/*set_field_id=*/false, &field_id));
+        ASSERT_FALSE(new_field->metadata());
+        auto map_type = 
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+        std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+        std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+        ASSERT_FALSE(key_field->metadata());
+        auto key_inner_type = 
std::static_pointer_cast<arrow::StructType>(key_field->type());
+        
ASSERT_EQ(key_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "0");
+        
ASSERT_EQ(key_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        ASSERT_FALSE(value_field->metadata());
+        auto value_inner_type = 
std::static_pointer_cast<arrow::StructType>(value_field->type());
+        
ASSERT_EQ(value_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+                  "2");
+        
ASSERT_EQ(value_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+                  "3");
+        ASSERT_EQ(field_id, 4);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdNestedStruct) {
+    auto inner_child1 = arrow::field("inner1", arrow::int32());
+    auto inner_child2 = arrow::field("inner2", arrow::float64());
+    auto inner_struct = arrow::field("inner_struct", 
arrow::struct_({inner_child1, inner_child2}));
+    auto outer_struct = arrow::field("outer_struct", 
arrow::struct_({inner_struct}));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(outer_struct, 
/*set_field_id=*/true, &field_id));
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        auto outer_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        
ASSERT_EQ(outer_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        auto inner_type = 
std::static_pointer_cast<arrow::StructType>(outer_type->field(0)->type());
+        
ASSERT_EQ(inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "2");
+        
ASSERT_EQ(inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "3");
+        ASSERT_EQ(field_id, 4);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+                             TableSchema::AssignFieldIdsRecursively(
+                                 outer_struct, /*set_field_id=*/false, 
&field_id));
+        ASSERT_FALSE(new_field->metadata());
+        auto outer_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        
ASSERT_EQ(outer_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "0");
+        auto inner_type = 
std::static_pointer_cast<arrow::StructType>(outer_type->field(0)->type());
+        
ASSERT_EQ(inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        
ASSERT_EQ(inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "2");
+        ASSERT_EQ(field_id, 3);
+    }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdNestedListInStruct) {
+    auto list_value_field = arrow::field("list_values", arrow::int32());
+    auto list_field = arrow::field("list_column", 
arrow::list(list_value_field));
+    auto struct_field = arrow::field("struct_with_list", 
arrow::struct_({list_field}));
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::Field> new_field,
+            TableSchema::AssignFieldIdsRecursively(struct_field, 
/*set_field_id=*/true, &field_id));
+        auto struct_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+        
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "1");
+        auto list_type =
+            
arrow::internal::checked_pointer_cast<arrow::ListType>(struct_type->field(0)->type());
+        ASSERT_FALSE(list_type->value_field()->metadata());
+        ASSERT_EQ(field_id, 2);
+    }
+    {
+        int32_t field_id = 0;
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+                             TableSchema::AssignFieldIdsRecursively(
+                                 struct_field, /*set_field_id=*/false, 
&field_id));
+        auto struct_type = 
std::static_pointer_cast<arrow::StructType>(new_field->type());
+        ASSERT_FALSE(new_field->metadata());
+        
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
 "0");
+        auto list_type =
+            
arrow::internal::checked_pointer_cast<arrow::ListType>(struct_type->field(0)->type());
+        ASSERT_FALSE(list_type->value_field()->metadata());
+        ASSERT_EQ(field_id, 1);
+    }
+}
+
+TEST_F(TableSchemaTest, MapKeyMustBeNotNull) {
+    std::string table_schema_str = R"({
+        "version" : 3,
+        "id" : 0,
+        "fields" : [ {
+                "id" : 0,
+                "name" : "f0",
+                "type" : {
+                    "type": "MAP",
+                    "key": "TINYINT",
+                    "value": "SMALLINT"
+                }
+        } ],
+        "highestFieldId" : 0,
+        "partitionKeys" : [],
+        "primaryKeys" : [],
+        "options" : {},
+        "timeMillis" : 1721614341162
+    })";
+    ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+                        "Map field 'f0' has a nullable key.");
+}
+
+TEST_F(TableSchemaTest, CrossPartitionUpdate) {
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::boolean()), arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int8()),    arrow::field("f3", 
arrow::int16()),
+        arrow::field("f4", arrow::int16()),   arrow::field("f5", 
arrow::int32())};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f1", "f2"};
+    std::vector<std::string> partition_keys = {"f2", "f3"};
+    std::map<std::string, std::string> options;
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_TRUE(table_schema);
+    ASSERT_TRUE(table_schema->CrossPartitionUpdate());
+}
+
+TEST_F(TableSchemaTest, CrossPartitionUpdate2) {
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::boolean()), arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int8()),    arrow::field("f3", 
arrow::int16()),
+        arrow::field("f4", arrow::int16()),   arrow::field("f5", 
arrow::int32())};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f1", "f2", "f3"};
+    std::vector<std::string> partition_keys = {"f2", "f3"};
+    std::map<std::string, std::string> options;
+    ASSERT_OK_AND_ASSIGN(
+        auto table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_TRUE(table_schema);
+    ASSERT_FALSE(table_schema->CrossPartitionUpdate());
+}
+
+}  // namespace paimon::test

Reply via email to