This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ab1b2a  feat(schema): add schema validation utilities (#55)
6ab1b2a is described below

commit 6ab1b2a260dc5a9b46d6b0236cf5f561b866c6a5
Author: Yonghao Fang <[email protected]>
AuthorDate: Mon Jun 8 15:20:36 2026 +0800

    feat(schema): add schema validation utilities (#55)
    
    * feat(schema): add schema validation utilities
    
    * fix
---
 src/paimon/core/schema/arrow_schema_validator.cpp  | 266 +++++++
 src/paimon/core/schema/arrow_schema_validator.h    |  64 ++
 .../core/schema/arrow_schema_validator_test.cpp    | 348 +++++++++
 src/paimon/core/schema/schema_validation.cpp       | 510 +++++++++++++
 src/paimon/core/schema/schema_validation.h         |  79 ++
 src/paimon/core/schema/schema_validation_test.cpp  | 811 +++++++++++++++++++++
 6 files changed, 2078 insertions(+)

diff --git a/src/paimon/core/schema/arrow_schema_validator.cpp 
b/src/paimon/core/schema/arrow_schema_validator.cpp
new file mode 100644
index 0000000..22be2de
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator.cpp
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/arrow_schema_validator.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class KeyValueMetadata;
+}  // namespace arrow
+
+namespace paimon {
+
+bool ArrowSchemaValidator::IsNestedType(const 
std::shared_ptr<arrow::DataType>& data_type) {
+    return (data_type->id() == arrow::Type::MAP || data_type->id() == 
arrow::Type::LIST ||
+            data_type->id() == arrow::Type::STRUCT);
+}
+
+Status ArrowSchemaValidator::ValidateSchema(const arrow::Schema& schema) {
+    // validate no duplicate fields
+    PAIMON_RETURN_NOT_OK(ValidateNoRedundantFields(schema.fields()));
+    // validate no whitespace-only fields
+    PAIMON_RETURN_NOT_OK(ValidateNoWhitespaceOnlyFields(schema.fields()));
+    // validate data type
+    for (const auto& field : schema.fields()) {
+        PAIMON_RETURN_NOT_OK(ValidateField(field));
+    }
+    return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateSchemaWithFieldId(const arrow::Schema& 
schema) {
+    PAIMON_RETURN_NOT_OK(ValidateSchema(schema));
+    auto struct_type = arrow::struct_(schema.fields());
+    std::set<int32_t> field_id_set;
+    PAIMON_RETURN_NOT_OK(
+        ValidateDataTypeWithFieldId(struct_type, 
/*key_value_metadata=*/nullptr, &field_id_set));
+    return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateNoRedundantFields(const 
arrow::FieldVector& fields) {
+    std::set<std::string> field_names;
+    for (const auto& field : fields) {
+        auto iter = field_names.find(field->name());
+        if (iter != field_names.end()) {
+            return Status::Invalid(fmt::format(
+                "validate schema failed: read schema has duplicate field {}", 
field->name()));
+        }
+        field_names.insert(field->name());
+        if (IsNestedType(field->type())) {
+            
PAIMON_RETURN_NOT_OK(ValidateNoRedundantFields(field->type()->fields()));
+        }
+    }
+    return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(const 
arrow::FieldVector& fields) {
+    for (const auto& field : fields) {
+        if (StringUtils::IsNullOrWhitespaceOnly(field->name())) {
+            return Status::Invalid(
+                fmt::format("validate schema failed: read schema has 
whitespace-only field"));
+        }
+        if (IsNestedType(field->type())) {
+            
PAIMON_RETURN_NOT_OK(ValidateNoWhitespaceOnlyFields(field->type()->fields()));
+        }
+    }
+    return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateDataTypeWithFieldId(
+    const std::shared_ptr<arrow::DataType>& type,
+    const std::shared_ptr<const arrow::KeyValueMetadata>& key_value_metadata,
+    std::set<int32_t>* field_id_set) {
+    const auto kind = type->id();
+    switch (kind) {
+        case arrow::Type::type::BOOL:
+        case arrow::Type::type::INT8:
+        case arrow::Type::type::INT16:
+        case arrow::Type::type::INT32:
+        case arrow::Type::type::INT64:
+        case arrow::Type::type::FLOAT:
+        case arrow::Type::type::DOUBLE:
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY:
+        case arrow::Type::type::DATE32:
+        case arrow::Type::type::DECIMAL128:
+        case arrow::Type::type::TIMESTAMP:
+            return Status::OK();
+        case arrow::Type::type::LIST: {
+            const auto& value_field =
+                
arrow::internal::checked_cast<arrow::BaseListType*>(type.get())->value_field();
+            PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(
+                value_field->type(), value_field->metadata(), field_id_set));
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            arrow::FieldVector sub_fields =
+                
arrow::internal::checked_cast<arrow::StructType*>(type.get())->fields();
+            for (const auto& sub_field : sub_fields) {
+                PAIMON_ASSIGN_OR_RAISE(DataField data_field,
+                                       
DataField::ConvertArrowFieldToDataField(sub_field));
+                auto iter = field_id_set->find(data_field.Id());
+                if (iter != field_id_set->end()) {
+                    return Status::Invalid(fmt::format(
+                        "field id must be unique, duplicate field id {}", 
data_field.Id()));
+                }
+                field_id_set->insert(data_field.Id());
+                PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(
+                    sub_field->type(), sub_field->metadata(), field_id_set));
+            }
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            const auto& key_field =
+                
arrow::internal::checked_cast<arrow::MapType*>(type.get())->key_field();
+            const auto& item_field =
+                
arrow::internal::checked_cast<arrow::MapType*>(type.get())->item_field();
+            PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(key_field->type(),
+                                                             
key_field->metadata(), field_id_set));
+            
PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(item_field->type(),
+                                                             
item_field->metadata(), field_id_set));
+            break;
+        }
+        case arrow::Type::type::LARGE_BINARY: {
+            if (BlobUtils::IsBlobMetadata(key_value_metadata)) {
+                break;
+            }
+            [[fallthrough]];
+        }
+        default: {
+            return Status::Invalid("Unknown or unsupported arrow type: ", 
type->ToString());
+        }
+    }
+    return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateField(const 
std::shared_ptr<arrow::Field>& field) {
+    const auto kind = field->type()->id();
+    switch (kind) {
+        case arrow::Type::type::BOOL:
+        case arrow::Type::type::INT8:
+        case arrow::Type::type::INT16:
+        case arrow::Type::type::INT32:
+        case arrow::Type::type::INT64:
+        case arrow::Type::type::FLOAT:
+        case arrow::Type::type::DOUBLE:
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY:
+        case arrow::Type::type::DATE32:
+        case arrow::Type::type::TIMESTAMP:
+            break;
+        case arrow::Type::type::DECIMAL128:
+            
PAIMON_RETURN_NOT_OK(DecimalUtils::CheckDecimalType(*field->type()));
+            break;
+        case arrow::Type::type::LIST: {
+            const auto& value_field =
+                arrow::internal::checked_cast<const 
arrow::BaseListType&>(*field->type())
+                    .value_field();
+            PAIMON_RETURN_NOT_OK(ValidateField(value_field));
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            arrow::FieldVector arrow_fields =
+                arrow::internal::checked_cast<const 
arrow::StructType&>(*field->type()).fields();
+            for (const auto& sub_field : arrow_fields) {
+                PAIMON_RETURN_NOT_OK(ValidateField(sub_field));
+            }
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            const auto& key_field =
+                arrow::internal::checked_cast<const 
arrow::MapType&>(*field->type()).key_field();
+            const auto& item_field =
+                arrow::internal::checked_cast<const 
arrow::MapType&>(*field->type()).item_field();
+            PAIMON_RETURN_NOT_OK(ValidateField(key_field));
+            PAIMON_RETURN_NOT_OK(ValidateField(item_field));
+            break;
+        }
+        case arrow::Type::type::LARGE_BINARY: {
+            if (BlobUtils::IsBlobField(field)) {
+                break;
+            }
+            [[fallthrough]];
+        }
+        default: {
+            return Status::Invalid("Unknown or unsupported arrow type: ",
+                                   field->type()->ToString());
+        }
+    }
+    return Status::OK();
+}
+
+bool ArrowSchemaValidator::ContainTimestampWithTimezone(const arrow::DataType& 
type) {
+    const auto kind = type.id();
+    switch (kind) {
+        case arrow::Type::type::LIST: {
+            const auto& value_field =
+                arrow::internal::checked_cast<const 
arrow::ListType&>(type).value_field();
+            if (ContainTimestampWithTimezone(*value_field->type())) {
+                return true;
+            }
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            arrow::FieldVector arrow_fields =
+                arrow::internal::checked_cast<const 
arrow::StructType&>(type).fields();
+            for (const auto& sub_field : arrow_fields) {
+                if (ContainTimestampWithTimezone(*sub_field->type())) {
+                    return true;
+                }
+            }
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            const auto& key_field =
+                arrow::internal::checked_cast<const 
arrow::MapType&>(type).key_field();
+            const auto& item_field =
+                arrow::internal::checked_cast<const 
arrow::MapType&>(type).item_field();
+            if (ContainTimestampWithTimezone(*key_field->type())) {
+                return true;
+            }
+            if (ContainTimestampWithTimezone(*item_field->type())) {
+                return true;
+            }
+            break;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            const auto& ts_type = arrow::internal::checked_cast<const 
arrow::TimestampType&>(type);
+            if (!ts_type.timezone().empty()) {
+                return true;
+            }
+            return false;
+        }
+        default: {
+            return false;
+        }
+    }
+    return false;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/arrow_schema_validator.h 
b/src/paimon/core/schema/arrow_schema_validator.h
new file mode 100644
index 0000000..6118445
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <set>
+#include <vector>
+
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace arrow {
+class DataType;
+class Field;
+class Schema;
+class KeyValueMetadata;
+}  // namespace arrow
+
+namespace paimon {
+class PAIMON_EXPORT ArrowSchemaValidator {
+ public:
+    ArrowSchemaValidator() = delete;
+    ~ArrowSchemaValidator() = delete;
+    static Status ValidateSchema(const arrow::Schema& schema);
+
+    static Status ValidateSchemaWithFieldId(const arrow::Schema& schema);
+
+    static Status ValidateNoRedundantFields(const arrow::FieldVector& fields);
+
+    static Status ValidateNoWhitespaceOnlyFields(const arrow::FieldVector& 
fields);
+
+    static Status ValidateField(const std::shared_ptr<arrow::Field>& field);
+
+    static bool ContainTimestampWithTimezone(const arrow::DataType& type);
+
+    static bool IsNestedType(const std::shared_ptr<arrow::DataType>& 
data_type);
+
+ private:
+    static Status ValidateDataTypeWithFieldId(
+        const std::shared_ptr<arrow::DataType>& type,
+        const std::shared_ptr<const arrow::KeyValueMetadata>& 
key_value_metadata,
+        std::set<int32_t>* field_id_set);
+};
+}  // namespace paimon
diff --git a/src/paimon/core/schema/arrow_schema_validator_test.cpp 
b/src/paimon/core/schema/arrow_schema_validator_test.cpp
new file mode 100644
index 0000000..f1ec0e5
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator_test.cpp
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/arrow_schema_validator.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ArrowSchemaValidatorTest, TestSimple) {
+    auto col1_field = arrow::field("col1", arrow::int64());
+    auto col2_field = arrow::field("col2", arrow::int32());
+    auto col3_field = arrow::field("col3", arrow::int16());
+    auto col4_field = arrow::field("col4", arrow::int8());
+    auto col5_field = arrow::field("col5", arrow::float64());
+    auto col6_field = arrow::field("col6", arrow::float32());
+    auto col7_field = arrow::field("col7", arrow::boolean());
+    auto col8_field = arrow::field("col8", arrow::utf8());
+    auto col9_field = arrow::field("col9", arrow::binary());
+    auto col10_field = arrow::field("col10", arrow::date32());
+    auto col11_field = arrow::field("col11", arrow::decimal128(20, 4));
+    auto col12_field = arrow::field("col12", arrow::decimal128(18, 5));
+    auto col13_field = arrow::field("col13", arrow::list(arrow::int64()));
+    auto col14_field = arrow::field("col14", arrow::map(arrow::utf8(), 
arrow::int64()));
+    auto col15_field = arrow::field("col15", 
arrow::timestamp(arrow::TimeUnit::NANO));
+    auto col16_field = arrow::field(
+        "col16",
+        arrow::struct_({arrow::field("sub1", arrow::int8()), 
arrow::field("sub2", arrow::int16()),
+                        arrow::field("sub3", arrow::int64())}));
+
+    auto arrow_schema = arrow::schema(
+        arrow::FieldVector({col1_field, col2_field, col3_field, col4_field, 
col5_field, col6_field,
+                            col7_field, col8_field, col9_field, col10_field, 
col11_field,
+                            col12_field, col13_field, col14_field, 
col15_field, col16_field}));
+    ASSERT_OK(ArrowSchemaValidator::ValidateSchema(*arrow_schema));
+}
+
+TEST(ArrowSchemaValidatorTest, TestValidateNoRedundantFields) {
+    auto col1_field = arrow::field("col1", arrow::int64());
+    auto col2_field = arrow::field("col2", arrow::int32());
+    auto col3_field = arrow::field("col3", arrow::int16());
+    {
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field, 
col2_field, col3_field}));
+        
ASSERT_OK(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()));
+    }
+    {
+        auto arrow_schema =
+            arrow::schema(arrow::FieldVector({col1_field, col2_field, 
col3_field, col2_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()),
+                            "validate schema failed: read schema has duplicate 
field col2");
+    }
+    {
+        auto col4_field =
+            arrow::field("col4", arrow::struct_({arrow::field("sub1", 
arrow::int8()),
+                                                 arrow::field("sub1", 
arrow::int16())}));
+        auto arrow_schema =
+            arrow::schema(arrow::FieldVector({col1_field, col2_field, 
col3_field, col4_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()),
+                            "validate schema failed: read schema has duplicate 
field sub1");
+    }
+}
+
+TEST(ArrowSchemaValidatorTest, TestValidateNoWhitespaceOnlyFields) {
+    auto col1_field = arrow::field("col1", arrow::int64());
+    auto col2_field = arrow::field("col2", arrow::int32());
+    auto col3_field = arrow::field("col3", arrow::int16());
+    {
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field, 
col2_field, col3_field}));
+        
ASSERT_OK(ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()));
+    }
+    {
+        auto col4_field = arrow::field(" ", arrow::int16());
+        auto arrow_schema =
+            arrow::schema(arrow::FieldVector({col1_field, col2_field, 
col3_field, col4_field}));
+        ASSERT_NOK_WITH_MSG(
+            
ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()),
+            "validate schema failed: read schema has whitespace-only field");
+    }
+    {
+        auto col4_field = arrow::field("col4", 
arrow::struct_({arrow::field("sub1", arrow::int8()),
+                                                               arrow::field(" 
", arrow::int16())}));
+        auto arrow_schema =
+            arrow::schema(arrow::FieldVector({col1_field, col2_field, 
col3_field, col4_field}));
+        ASSERT_NOK_WITH_MSG(
+            
ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()),
+            "validate schema failed: read schema has whitespace-only field");
+    }
+}
+
+TEST(ArrowSchemaValidatorTest, TestInvalidDataType) {
+    {
+        auto col1_field = arrow::field("col1", arrow::large_utf8());
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: large_string");
+    }
+    {
+        auto col1_field = arrow::field("col1", arrow::large_binary());
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: large_binary");
+    }
+    {
+        auto col1_field = arrow::field("col1", arrow::uint32());
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: uint32");
+    }
+    {
+        auto union_type = arrow::sparse_union(
+            {arrow::field("_union_0", arrow::int32()), 
arrow::field("_union_1", arrow::utf8())});
+        auto col1_field = arrow::field("col1", union_type);
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: 
sparse_union<_union_0: int32=0, "
+                            "_union_1: string=1>");
+    }
+    {
+        auto col1_field = arrow::field("col1", arrow::date64());
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: date64[ms]");
+    }
+    {
+        auto col1_field = arrow::field("col1", arrow::decimal256(10, 5));
+        auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+                            "Unknown or unsupported arrow type: decimal256(10, 
5)");
+    }
+}
+
+TEST(ArrowSchemaValidatorTest, ValidateDataTypeWithFieldId) {
+    {
+        std::vector<DataField> fields = {DataField(3, arrow::field("f3", 
arrow::float64())),
+                                         DataField(0, arrow::field("f0", 
arrow::utf8())),
+                                         DataField(1, arrow::field("f1", 
arrow::int32()))};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema));
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3, arrow::field("sub_f3", arrow::int32()))};
+        std::vector<DataField> sub_fields1 = {
+            DataField(5, arrow::field("sub_f5", arrow::float64())),
+            DataField(6, arrow::field("sub_f6", arrow::utf8())),
+            DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        DataField field1 = DataField(
+            4, arrow::field(
+                   "f1", arrow::map(arrow::utf8(),
+                                    
DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+        DataField field2 =
+            DataField(8, arrow::field("f2", arrow::map(arrow::int8(), 
arrow::int16())));
+        std::vector<DataField> fields = {field0, field1, field2};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema))
+            << 
ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema).ToString();
+    }
+    {
+        std::vector<DataField> fields = {DataField(0, arrow::field("f3", 
arrow::float64())),
+                                         DataField(0, arrow::field("f0", 
arrow::utf8())),
+                                         DataField(1, arrow::field("f1", 
arrow::int32()))};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+                            "field id must be unique, duplicate field id 0");
+    }
+    {
+        arrow::FieldVector fields = {arrow::field("f3", arrow::float64()),
+                                     arrow::field("f0", arrow::utf8()),
+                                     arrow::field("f1", arrow::int32())};
+        auto arrow_schema = arrow::schema(fields);
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+                            "invalid read schema, lack of metadata of field 
id");
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3, arrow::field("sub_f3", arrow::int32()))};
+        std::vector<DataField> sub_fields1 = {
+            DataField(5, arrow::field("sub_f5", arrow::float64())),
+            DataField(6, arrow::field("sub_f6", arrow::utf8())),
+            DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        DataField field1 = DataField(
+            4, arrow::field(
+                   "f1", 
arrow::list(DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+        DataField field2 =
+            DataField(8, arrow::field("f2", arrow::map(arrow::int8(), 
arrow::int16())));
+        std::vector<DataField> fields = {field0, field1, field2};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema))
+            << 
ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema).ToString();
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3, arrow::field("sub_f3", arrow::int32()))};
+        std::vector<DataField> sub_fields1 = {
+            DataField(5, arrow::field("sub_f5", arrow::float64())),
+            DataField(4, arrow::field("sub_f6", arrow::utf8())),
+            DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        DataField field1 = DataField(
+            4, arrow::field(
+                   "f1", 
arrow::list(DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+        DataField field2 =
+            DataField(8, arrow::field("f2", arrow::map(arrow::int8(), 
arrow::int16())));
+        std::vector<DataField> fields = {field0, field1, field2};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+                            "field id must be unique, duplicate field id 4");
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3, arrow::field("sub_f3", arrow::int32()))};
+        arrow::FieldVector invalid_sub_fields = {arrow::field("sub_f4", 
arrow::float64()),
+                                                 arrow::field("sub_f5", 
arrow::utf8()),
+                                                 arrow::field("sub_f6", 
arrow::int32())};
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        DataField field1 =
+            DataField(4, arrow::field("f1", 
arrow::list(arrow::struct_(invalid_sub_fields))));
+        DataField field2 =
+            DataField(5, arrow::field("f2", arrow::map(arrow::int8(), 
arrow::int16())));
+        std::vector<DataField> fields = {field0, field1, field2};
+        auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+        
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+                            "invalid read schema, lack of metadata of field 
id");
+    }
+    {
+        std::vector<DataField> fields = {DataField(0, arrow::field("f0", 
arrow::float64())),
+                                         DataField(1, arrow::field("f1", 
arrow::large_utf8())),
+                                         DataField(2, arrow::field("f2", 
arrow::int32()))};
+        auto struct_type = 
DataField::ConvertDataFieldsToArrowStructType(fields);
+        std::set<int32_t> field_id_set;
+        ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateDataTypeWithFieldId(
+                                struct_type, /*key_value_metadata=*/nullptr, 
&field_id_set),
+                            "Unknown or unsupported arrow type: large_string");
+    }
+}
+
+TEST(ArrowSchemaValidatorTest, ContainTimestampWithTimezone) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    {
+        std::vector<DataField> fields = {
+            DataField(0, arrow::field("f0", arrow::float64())),
+            DataField(1, arrow::field("f1", arrow::utf8())),
+            DataField(2, arrow::field("f2", 
arrow::timestamp(arrow::TimeUnit::NANO)))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_FALSE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        std::vector<DataField> fields = {
+            DataField(0, arrow::field("f0", arrow::float64())),
+            DataField(1, arrow::field("f1", arrow::utf8())),
+            DataField(2, arrow::field("f2", 
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3,
+                      arrow::field("sub_f3", 
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        std::vector<DataField> fields = {field0, DataField(4, 
arrow::field("f1", arrow::utf8()))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        std::vector<DataField> sub_fields0 = {
+            DataField(1, arrow::field("sub_f1", arrow::float64())),
+            DataField(2, arrow::field("sub_f2", arrow::utf8())),
+            DataField(3, arrow::field("sub_f3", arrow::decimal128(20, 5)))};
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+        std::vector<DataField> fields = {field0, DataField(4, 
arrow::field("f1", arrow::utf8()))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_FALSE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        DataField field0 = DataField(
+            0, arrow::field("f0", arrow::map(arrow::int8(),
+                                             
arrow::timestamp(arrow::TimeUnit::NANO, timezone))));
+        std::vector<DataField> fields = {field0, DataField(1, 
arrow::field("f1", arrow::utf8()))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
arrow::map(arrow::timestamp(arrow::TimeUnit::NANO, timezone),
+                                             arrow::int8())));
+        std::vector<DataField> fields = {field0, DataField(1, 
arrow::field("f1", arrow::utf8()))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+    {
+        DataField field0 = DataField(
+            0, arrow::field("f0", 
arrow::list(arrow::timestamp(arrow::TimeUnit::NANO, timezone))));
+        std::vector<DataField> fields = {field0, DataField(1, 
arrow::field("f1", arrow::utf8()))};
+        std::shared_ptr<arrow::DataType> arrow_data_type =
+            DataField::ConvertDataFieldsToArrowStructType(fields);
+        
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/core/schema/schema_validation.cpp 
b/src/paimon/core/schema/schema_validation.cpp
new file mode 100644
index 0000000..28949a4
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation.cpp
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_validation.h"
+
+#include <algorithm>
+#include <cassert>
+#include <map>
+#include <optional>
+#include <set>
+#include <sstream>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/preconditions.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/options/changelog_producer.h"
+#include "paimon/core/options/expire_config.h"
+#include "paimon/core/options/merge_engine.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/table/bucket_mode.h"
+#include "paimon/defs.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+bool SchemaValidation::IsComplexType(const std::shared_ptr<arrow::Field>& 
field) {
+    return (field->type()->id() == arrow::Type::TIMESTAMP ||
+            field->type()->id() == arrow::Type::DECIMAL || 
BlobUtils::IsBlobField(field));
+}
+
+Status SchemaValidation::ValidateTableSchema(const TableSchema& schema) {
+    const auto& field_names = schema.FieldNames();
+    PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.BucketKeys(), "bucket 
key"));
+    PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.PrimaryKeys(), 
"primary key"));
+    PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.PartitionKeys(), 
"partition key"));
+    PAIMON_RETURN_NOT_OK(
+        Preconditions::CheckState(ObjectUtils::ContainsAll(field_names, 
schema.PartitionKeys()),
+                                  "Table column {} should include all 
partition fields {}",
+                                  field_names, schema.PartitionKeys()));
+    PAIMON_RETURN_NOT_OK(
+        Preconditions::CheckState(ObjectUtils::ContainsAll(field_names, 
schema.PrimaryKeys()),
+                                  "Table column {} should include all primary 
key constraint {}",
+                                  field_names, schema.PrimaryKeys()));
+
+    PAIMON_RETURN_NOT_OK(
+        ValidateOnlyContainPrimitiveType(schema.Fields(), 
schema.PrimaryKeys(), "primary key"));
+    PAIMON_RETURN_NOT_OK(
+        ValidateOnlyContainPrimitiveType(schema.Fields(), 
schema.PartitionKeys(), "partition"));
+    // TODO(lisizhuo.lsz): C++ Paimon do not support timestamp & decimal type 
in partition keys for
+    // now.
+    PAIMON_RETURN_NOT_OK(ValidateNotContainComplexType(schema.Fields(), 
schema.PartitionKeys()));
+
+    PAIMON_ASSIGN_OR_RAISE(CoreOptions options, 
CoreOptions::FromMap(schema.Options()));
+    PAIMON_RETURN_NOT_OK(ValidateBucket(schema, options));
+    // PAIMON_RETURN_NOT_OK(ValidateDefaultValues(schema));
+    // PAIMON_RETURN_NOT_OK(ValidateStartupMode(options));
+    PAIMON_RETURN_NOT_OK(ValidateFieldsPrefix(schema, options));
+    PAIMON_RETURN_NOT_OK(ValidateSequenceField(schema, options));
+    PAIMON_RETURN_NOT_OK(ValidateSequenceGroup(schema, options));
+
+    ChangelogProducer changelog_producer = options.GetChangelogProducer();
+    if (schema.PrimaryKeys().empty() && changelog_producer != 
ChangelogProducer::NONE) {
+        return Status::Invalid(
+            fmt::format("Can not set {} on table without primary keys, please 
define primary keys.",
+                        Options::CHANGELOG_PRODUCER));
+    }
+    PAIMON_RETURN_NOT_OK(ValidateChangelogProducer(options));
+    PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+        options.GetExpireConfig().GetSnapshotRetainMin() > 0,
+        std::string(Options::SNAPSHOT_NUM_RETAINED_MIN) + " should be at least 
1"));
+    PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+        options.GetExpireConfig().GetSnapshotRetainMin() <=
+            options.GetExpireConfig().GetSnapshotRetainMax(),
+        std::string(Options::SNAPSHOT_NUM_RETAINED_MIN) + " should not be 
larger than " +
+            std::string(Options::SNAPSHOT_NUM_RETAINED_MAX)));
+
+    // TODO(yonghao.fyh): check changelog num retain
+    // TODO(yonghao.fyh): support file format validate data fields
+    for (const auto& field_name : field_names) {
+        if (SpecialFields::IsSpecialFieldName(field_name)) {
+            return Status::Invalid(
+                fmt::format("field name '{}' in schema cannot be special 
field.", field_name));
+        }
+        if (StringUtils::StartsWith(field_name, 
SpecialFields::KEY_FIELD_PREFIX)) {
+            return Status::Invalid(fmt::format("field name '{}' in schema 
cannot start with '{}'.",
+                                               field_name, 
SpecialFields::KEY_FIELD_PREFIX));
+        }
+    }
+    // TODO(yonghao.fyh): check streaming read overwrite
+    // TODO(yonghao.fyh): check 'partition.expiration-time'
+    // TODO(yonghao.fyh): check 'rowkind.field'
+    if (options.DeletionVectorsEnabled()) {
+        PAIMON_RETURN_NOT_OK(ValidateForDeletionVectors(options));
+    }
+
+    PAIMON_RETURN_NOT_OK(ValidateRowTracking(schema, options));
+    PAIMON_RETURN_NOT_OK(ValidateBlobFields(schema, options));
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateNoDuplicateField(const 
std::vector<std::string>& field_names,
+                                                  const std::string& 
error_message_intro) {
+    auto duplicate_field_names = ObjectUtils::DuplicateItems(field_names);
+    PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+        duplicate_field_names.empty(),
+        fmt::format("{} [{}] must not contain duplicate fields. Found: [{}]", 
error_message_intro,
+                    fmt::join(field_names, ", "), 
fmt::join(duplicate_field_names, ", "))));
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateOnlyContainPrimitiveType(
+    const std::vector<DataField>& fields, const std::vector<std::string>& 
field_names,
+    const std::string& error_message_intro) {
+    if (field_names.empty()) {
+        return Status::OK();
+    }
+    std::unordered_map<std::string, std::shared_ptr<arrow::DataType>> 
fields_map;
+    for (const auto& field : fields) {
+        fields_map[field.Name()] = field.Type();
+    }
+    for (const auto& field_name : field_names) {
+        auto it = fields_map.find(field_name);
+        if (it != fields_map.end()) {
+            auto data_type = it->second;
+            if (ArrowSchemaValidator::IsNestedType(data_type)) {
+                return Status::Invalid(fmt::format("The type {} in {} field {} 
is unsupported",
+                                                   data_type->ToString(), 
error_message_intro,
+                                                   it->first));
+            }
+        } else {
+            assert(false);
+            return Status::Invalid(
+                fmt::format("unexpected error, field {} not found in fields 
map", field_name));
+        }
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateNotContainComplexType(
+    const std::vector<DataField>& fields, const std::vector<std::string>& 
field_names) {
+    if (field_names.empty()) {
+        return Status::OK();
+    }
+    std::unordered_map<std::string, std::shared_ptr<arrow::Field>> fields_map;
+    for (const auto& field : fields) {
+        fields_map[field.Name()] = field.ArrowField();
+    }
+    for (const auto& field_name : field_names) {
+        auto it = fields_map.find(field_name);
+        if (it != fields_map.end()) {
+            auto field = it->second;
+            if (IsComplexType(field)) {
+                return Status::Invalid(
+                    fmt::format("The field {} in partition field {} is 
unsupported",
+                                field->ToString(), it->first));
+            }
+        } else {
+            assert(false);
+            return Status::Invalid(fmt::format(
+                "unexpected error, partition field {} not found in schema", 
field_name));
+        }
+    }
+    return Status::OK();
+}
+
+bool SchemaValidation::IsPostponeBucketTable(const TableSchema& schema, 
int32_t bucket) {
+    return !schema.PrimaryKeys().empty() && bucket == 
BucketModeDefine::POSTPONE_BUCKET;
+}
+
+Status SchemaValidation::ValidateBucket(const TableSchema& schema, const 
CoreOptions& options) {
+    int32_t bucket = options.GetBucket();
+    if (bucket == -1) {
+        if (options.ToMap().count(Options::BUCKET_KEY)) {
+            return Status::Invalid(
+                fmt::format("Cannot define '{}' with bucket -1, please specify 
a bucket number.",
+                            Options::BUCKET_KEY));
+        }
+        if (schema.PrimaryKeys().empty() &&
+            options.ToMap().count("full-compaction.delta-commits")) {
+            return Status::Invalid(
+                "AppendOnlyTable of unware or dynamic bucket does not support "
+                "'full-compaction.delta-commits'");
+        }
+    } else if (bucket < 1 && !IsPostponeBucketTable(schema, bucket)) {
+        return Status::Invalid("The number of buckets needs to be greater than 
0.");
+    } else {
+        if (schema.CrossPartitionUpdate()) {
+            return Status::Invalid(fmt::format(
+                "You should use dynamic bucket (bucket = -1) mode in cross 
partition update case "
+                "(Primary key constraint '{}' not include all partition fields 
'{}').",
+                fmt::join(schema.PrimaryKeys(), ", "), 
fmt::join(schema.PartitionKeys(), ", ")));
+        }
+        if (schema.PrimaryKeys().empty() && schema.BucketKeys().empty()) {
+            return Status::Invalid("You should define a 'bucket-key' for 
bucketed append mode.");
+        }
+        if (!schema.BucketKeys().empty()) {
+            std::vector<std::string> bucket_keys = schema.BucketKeys();
+            std::vector<std::string> nested_fields;
+
+            for (const auto& field : schema.Fields()) {
+                if (std::find(bucket_keys.begin(), bucket_keys.end(), 
field.Name()) !=
+                        bucket_keys.end() &&
+                    ArrowSchemaValidator::IsNestedType(field.Type())) {
+                    nested_fields.push_back(field.Name());
+                }
+            }
+
+            if (!nested_fields.empty()) {
+                return Status::Invalid(fmt::format(
+                    "Nested type cannot be in bucket-key, in your table these 
keys are: {}",
+                    fmt::join(nested_fields, ", ")));
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateChangelogProducer(const CoreOptions& options) 
{
+    return Preconditions::CheckState(options.GetChangelogProducer() == 
ChangelogProducer::NONE,
+                                     "C++ Paimon does not support 
changelog-producer yet. Please "
+                                     "keep changelog-producer as 'none'.");
+}
+
+Status SchemaValidation::ValidateForDeletionVectors(const CoreOptions& 
options) {
+    PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+        options.GetChangelogProducer() == ChangelogProducer::NONE ||
+            options.GetChangelogProducer() == ChangelogProducer::INPUT ||
+            options.GetChangelogProducer() == ChangelogProducer::LOOKUP,
+        "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP 
changelog producer now."));
+    return Preconditions::CheckState(
+        options.GetMergeEngine() != MergeEngine::FIRST_ROW,
+        "First row merge engine does not need deletion vectors because there 
is "
+        "no deletion of old data in this merge engine.");
+}
+
+Status SchemaValidation::ValidateSequenceGroup(const TableSchema& schema,
+                                               const CoreOptions& options) {
+    std::unordered_map<std::string, std::set<std::string>> fields2_group;
+    auto sequence_groups_map = options.GetFieldsSequenceGroups();
+    const std::vector<std::string>& field_names = schema.FieldNames();
+    for (const auto& [k, v] : sequence_groups_map) {
+        std::vector<std::string> sequence_field_names =
+            StringUtils::Split(k, Options::FIELDS_SEPARATOR);
+        for (const auto& sequence_field_name : sequence_field_names) {
+            if (std::find(field_names.begin(), field_names.end(), 
sequence_field_name) ==
+                field_names.end()) {
+                return Status::Invalid(
+                    fmt::format("The sequence field group: {} can not be found 
in table schema.",
+                                sequence_field_name));
+            }
+        }
+
+        for (const auto& field : StringUtils::Split(v, 
Options::FIELDS_SEPARATOR)) {
+            if (std::find(field_names.begin(), field_names.end(), field) == 
field_names.end()) {
+                return Status::Invalid(
+                    fmt::format("Field {} can not be found in table schema.", 
field));
+            }
+
+            if (fields2_group.count(field)) {
+                std::vector<std::vector<std::string>> sequence_groups;
+                sequence_groups.emplace_back(fields2_group[field].begin(),
+                                             fields2_group[field].end());
+                sequence_groups.push_back(sequence_field_names);
+
+                std::ostringstream sequence_groups_msg;
+                for (const auto& group : sequence_groups) {
+                    sequence_groups_msg << "{";
+                    for (const auto& group_field : group) {
+                        sequence_groups_msg << group_field << " ";
+                    }
+                    sequence_groups_msg << "} ";
+                }
+                return Status::Invalid(
+                    fmt::format("Field {} is defined repeatedly by multiple 
groups: {}.", field,
+                                sequence_groups_msg.str()));
+            }
+            fields2_group[field].insert(sequence_field_names.begin(), 
sequence_field_names.end());
+        }
+    }
+
+    std::set<std::string> illegal_group;
+    for (const auto& group : fields2_group) {
+        for (const auto& field : group.second) {
+            PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> agg_func,
+                                   options.GetFieldAggFunc(field));
+            if (agg_func) {
+                illegal_group.insert(field);
+            }
+        }
+    }
+
+    if (!illegal_group.empty()) {
+        std::ostringstream illegal_group_msg;
+        illegal_group_msg << "Should not define aggregation function on 
sequence group: ";
+        for (const auto& field : illegal_group) {
+            illegal_group_msg << field << " ";
+        }
+        return Status::Invalid(illegal_group_msg.str());
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateSequenceField(const TableSchema& schema,
+                                               const CoreOptions& options) {
+    std::vector<std::string> sequence_field = options.GetSequenceField();
+    if (!sequence_field.empty()) {
+        // Create field count map
+        std::unordered_map<std::string, int> field_count;
+        for (const auto& field : sequence_field) {
+            field_count[field]++;
+        }
+
+        const auto& field_names = schema.FieldNames();
+        for (const auto& field : sequence_field) {
+            PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+                std::find(field_names.begin(), field_names.end(), field) != 
field_names.end(),
+                fmt::format("Sequence field: '{}' cannot be found in table 
schema.", field)));
+
+            PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> agg_func,
+                                   options.GetFieldAggFunc(field));
+            PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+                agg_func == std::nullopt,
+                fmt::format("Should not define aggregation on sequence field: 
'{}'.", field)));
+
+            PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+                field_count[field] == 1, "Sequence field '" + field + "' is 
defined repeatedly."));
+        }
+
+        // Check for FIRST_ROW merge engine
+        if (options.GetMergeEngine() == MergeEngine::FIRST_ROW) {
+            return Status::Invalid(
+                "Do not support using sequence field on FIRST_ROW merge 
engine.");
+        }
+
+        // Check for cross partition update
+        if (schema.CrossPartitionUpdate()) {
+            return Status::Invalid(fmt::format(
+                "You cannot use sequence.field in cross partition update case 
(Primary "
+                "key constraint '{}'  not including all partition fields 
'{}').",
+                fmt::join(schema.PrimaryKeys(), ", "), 
fmt::join(schema.PartitionKeys(), ", ")));
+        }
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateFieldsPrefix(const TableSchema& schema,
+                                              const CoreOptions& options) {
+    const auto& field_names = schema.FieldNames();
+    const auto& options_map = options.ToMap();
+    for (const auto& [k, v] : options_map) {
+        if (StringUtils::StartsWith(k, Options::FIELDS_PREFIX)) {
+            std::vector<std::string> cols = StringUtils::Split(k, ".");
+            if (cols.size() < 2) {
+                return Status::Invalid("invalid options key " + k);
+            }
+            std::vector<std::string> fields =
+                StringUtils::Split(cols[1], Options::FIELDS_SEPARATOR);
+            for (const auto& field : fields) {
+                PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+                    Options::DEFAULT_AGG_FUNCTION == field ||
+                        std::find(field_names.begin(), field_names.end(), 
field) !=
+                            field_names.end(),
+                    "Field " + field + " can not be found in table schema."));
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateRowTracking(const TableSchema& table_schema,
+                                             const CoreOptions& options) {
+    bool row_tracking_enabled = options.RowTrackingEnabled();
+    if (row_tracking_enabled) {
+        PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+            options.GetBucket() == -1,
+            "Cannot define {} for row tracking table, it only support bucket = 
-1",
+            Options::BUCKET));
+        PAIMON_RETURN_NOT_OK(
+            Preconditions::CheckState(table_schema.PrimaryKeys().empty(),
+                                      "Cannot define primary key for row 
tracking table"));
+    }
+    if (options.DataEvolutionEnabled()) {
+        PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+            row_tracking_enabled, "Data evolution config must enabled with 
row-tracking.enabled"));
+        PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+            !options.DeletionVectorsEnabled(),
+            "Data evolution config must disabled with 
deletion-vectors.enabled"));
+    }
+
+    std::vector<std::string> blob_names;
+    for (const auto& field : table_schema.Fields()) {
+        if (BlobUtils::IsBlobField(field.ArrowField())) {
+            blob_names.push_back(field.Name());
+        }
+    }
+    if (!blob_names.empty()) {
+        // Validate blob fields cannot be partition keys
+        for (const auto& blob_field_name : blob_names) {
+            if (std::find(table_schema.PartitionKeys().begin(), 
table_schema.PartitionKeys().end(),
+                          blob_field_name) != 
table_schema.PartitionKeys().end()) {
+                return Status::Invalid(
+                    fmt::format("Blob field {} cannot be a partition key.", 
blob_field_name));
+            }
+        }
+
+        // Validate data evolution must be enabled when blob-field is 
configured
+        PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+            options.DataEvolutionEnabled(),
+            "Data evolution config must be enabled for table with BLOB type 
column."));
+        PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+            table_schema.Fields().size() > blob_names.size(),
+            "Table with BLOB type column must have other normal columns."));
+    }
+    return Status::OK();
+}
+
+Status SchemaValidation::ValidateBlobFields(const TableSchema& schema, const 
CoreOptions& options) {
+    const auto& configured_blob_names = options.GetBlobFields();
+    const auto& blob_descriptor_names = options.GetBlobDescriptorFields();
+    const auto& blob_view_names = options.GetBlobViewFields();
+    const auto& blob_external_storage_names = 
options.GetBlobExternalStorageFields();
+    std::vector<std::string> configured_blob_like_names = 
configured_blob_names;
+    configured_blob_like_names.insert(configured_blob_like_names.end(),
+                                      blob_descriptor_names.begin(), 
blob_descriptor_names.end());
+    configured_blob_like_names.insert(configured_blob_like_names.end(), 
blob_view_names.begin(),
+                                      blob_view_names.end());
+    if (configured_blob_like_names.empty() && 
blob_external_storage_names.empty()) {
+        return Status::OK();
+    }
+
+    auto validate_blob_fields = [&](const std::vector<std::string>& 
field_names,
+                                    const std::string& option_key) -> Status {
+        if (field_names.empty()) {
+            return Status::OK();
+        }
+        PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(field_names, 
option_key));
+        PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> blob_fields, 
schema.GetFields(field_names));
+        for (const auto& blob_field : blob_fields) {
+            if (!BlobUtils::IsBlobField(blob_field.ArrowField())) {
+                return Status::Invalid(
+                    fmt::format("Field '{}' in '{}' must be a BLOB field in 
table schema.",
+                                blob_field.Name(), option_key));
+            }
+        }
+        return Status::OK();
+    };
+
+    PAIMON_RETURN_NOT_OK(validate_blob_fields(configured_blob_names, 
Options::BLOB_FIELD));
+    PAIMON_RETURN_NOT_OK(
+        validate_blob_fields(blob_descriptor_names, 
Options::BLOB_DESCRIPTOR_FIELD));
+    PAIMON_RETURN_NOT_OK(validate_blob_fields(blob_view_names, 
Options::BLOB_VIEW_FIELD));
+    PAIMON_RETURN_NOT_OK(
+        validate_blob_fields(blob_external_storage_names, 
Options::BLOB_EXTERNAL_STORAGE_FIELD));
+
+    std::set<std::string> 
blob_descriptor_name_set(blob_descriptor_names.begin(),
+                                                   
blob_descriptor_names.end());
+    for (const auto& blob_view_name : blob_view_names) {
+        if (blob_descriptor_name_set.count(blob_view_name) > 0) {
+            return Status::Invalid(fmt::format("Field '{}' in '{}' can not 
also be in '{}'.",
+                                               blob_view_name, 
Options::BLOB_VIEW_FIELD,
+                                               
Options::BLOB_DESCRIPTOR_FIELD));
+        }
+    }
+
+    for (const auto& blob_external_storage_name : blob_external_storage_names) 
{
+        if (blob_descriptor_name_set.count(blob_external_storage_name) == 0) {
+            return Status::Invalid(
+                fmt::format("Field '{}' in '{}' must also be in '{}'.", 
blob_external_storage_name,
+                            Options::BLOB_EXTERNAL_STORAGE_FIELD, 
Options::BLOB_DESCRIPTOR_FIELD));
+        }
+    }
+    if (!blob_external_storage_names.empty()) {
+        auto external_storage_path = options.GetBlobExternalStoragePath();
+        if (!external_storage_path || external_storage_path->empty()) {
+            return Status::Invalid(fmt::format("'{}' must be set when '{}' is 
configured.",
+                                               
Options::BLOB_EXTERNAL_STORAGE_PATH,
+                                               
Options::BLOB_EXTERNAL_STORAGE_FIELD));
+        }
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/schema_validation.h 
b/src/paimon/core/schema/schema_validation.h
new file mode 100644
index 0000000..af73ee6
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class DataType;
+class Field;
+}  // namespace arrow
+
+namespace paimon {
+class CoreOptions;
+class DataField;
+class TableSchema;
+
+/// Validation utils for `TableSchema`.
+class SchemaValidation {
+ public:
+    SchemaValidation() = delete;
+    ~SchemaValidation() = delete;
+
+    static Status ValidateTableSchema(const TableSchema& schema);
+
+    static bool IsPostponeBucketTable(const TableSchema& schema, int32_t 
bucket);
+
+ private:
+    static Status ValidateNoDuplicateField(const std::vector<std::string>& 
field_names,
+                                           const std::string& 
error_message_intro);
+    static Status ValidateOnlyContainPrimitiveType(const 
std::vector<DataField>& fields,
+                                                   const 
std::vector<std::string>& field_names,
+                                                   const std::string& 
error_message_intro);
+    static Status ValidateNotContainComplexType(const std::vector<DataField>& 
fields,
+                                                const 
std::vector<std::string>& field_names);
+    static Status ValidateBucket(const TableSchema& schema, const CoreOptions& 
options);
+    static Status ValidateDefaultValues(const TableSchema& schema) {
+        return Status::NotImplemented("validate default values not 
implemented");
+    }
+    static Status ValidateStartupMode(const CoreOptions& options) {
+        return Status::NotImplemented("validate startup mode not implemented");
+    }
+    static Status ValidateFieldsPrefix(const TableSchema& schema, const 
CoreOptions& options);
+    static Status ValidateSequenceField(const TableSchema& schema, const 
CoreOptions& options);
+    static Status ValidateSequenceGroup(const TableSchema& schema, const 
CoreOptions& options);
+    static Status ValidateChangelogProducer(const CoreOptions& options);
+    static Status ValidateForDeletionVectors(const CoreOptions& options);
+
+    static Status ValidateRowTracking(const TableSchema& table_schema, const 
CoreOptions& options);
+
+    static Status ValidateBlobFields(const TableSchema& schema, const 
CoreOptions& options);
+
+    static bool IsComplexType(const std::shared_ptr<arrow::Field>& field);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/schema/schema_validation_test.cpp 
b/src/paimon/core/schema/schema_validation_test.cpp
new file mode 100644
index 0000000..0a08efe
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation_test.cpp
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_validation.h"
+
+#include <map>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/defs.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(SchemaValidationTest, TestSimple) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, TestRowTracking) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {
+        {Options::BUCKET, "-1"},
+        {Options::ROW_TRACKING_ENABLED, "true"},
+        {Options::DATA_EVOLUTION_ENABLED, "true"},
+    };
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, TestWithBlobField) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    std::shared_ptr<arrow::Field> f3 = BlobUtils::ToArrowField("f3", false);
+    std::shared_ptr<arrow::Field> f4 = BlobUtils::ToArrowField("f4", false);
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"f3,f4"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "-1"},
+            {Options::ROW_TRACKING_ENABLED, "true"},
+            {Options::DATA_EVOLUTION_ENABLED, "true"},
+            {Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+            {Options::BLOB_VIEW_FIELD, "f4"},
+            {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"},
+            {Options::BLOB_EXTERNAL_STORAGE_PATH, 
"FILE:///tmp/blob_external_storage/"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      
{Options::BLOB_DESCRIPTOR_FIELD, "f0"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Field 'f0' in 'blob-descriptor-field' must be a BLOB field in 
table schema.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      
{Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+                                                      
{Options::BLOB_VIEW_FIELD, "f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Field 'f3' in 'blob-view-field' can not also be in 
'blob-descriptor-field'.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "-1"},
+            {Options::ROW_TRACKING_ENABLED, "true"},
+            {Options::DATA_EVOLUTION_ENABLED, "true"},
+            {Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+            {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f4"},
+            {Options::BLOB_EXTERNAL_STORAGE_PATH, 
"FILE:///tmp/blob_external_storage/"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Field 'f4' in 'blob-external-storage-field' must also be in 
'blob-descriptor-field'.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      
{Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+                                                      
{Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "'blob-external-storage-path' must be set when "
+                            "'blob-external-storage-field' is configured.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "false"},
+                                                      {Options::BLOB_FIELD, 
"f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Data evolution config must be enabled for table with BLOB type 
column.");
+    }
+    {
+        arrow::FieldVector fields = {f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Table with BLOB type column must have other 
normal columns.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"non-exist"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Get field non-exist failed: not exist in table 
schema");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"f3,f0"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Field 'f0' in 'blob-field' must be a BLOB field 
in table schema.");
+    }
+    {
+        arrow::FieldVector fields = {f0, f1, f2, f3};
+        auto schema = arrow::schema(fields);
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f3"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      {Options::BLOB_FIELD, 
"f3"}};
+        ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options));
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateRowTracking(*table_schema, 
core_options),
+                            "Blob field f3 cannot be a partition key.");
+    }
+}
+
+TEST(SchemaValidationTest, TestDuplicateField) {
+    auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    {
+        // duplicate primary keys
+        std::vector<std::string> dup_primary_keys = {"f0", "f1", "f1"};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
partition_keys,
+                                                 dup_primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "primary key [f0, f1, f1] must not contain duplicate fields. 
Found: [f1]");
+    }
+    {
+        // duplicate partition keys
+        std::vector<std::string> dup_partition_keys = {"f1", "f1"};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
dup_partition_keys,
+                                                 primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "partition key [f1, f1] must not contain duplicate fields. Found: 
[f1]");
+    }
+    {
+        // duplicate bucket keys
+        std::map<std::string, std::string> dup_options = {{Options::BUCKET, 
"2"},
+                                                          
{Options::BUCKET_KEY, "f0,f0"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
partition_keys,
+                                                 primary_keys, dup_options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "bucket key [f0, f0] must not contain duplicate 
fields. Found: [f0]");
+    }
+}
+
+TEST(SchemaValidationTest, TestNonExistField) {
+    auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    {
+        // non-exist primary keys
+        std::vector<std::string> non_exist_primary_keys = {"f0", "f1", 
"non-exist"};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
partition_keys,
+                                                 non_exist_primary_keys, 
options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            R"(Table column ["f0", "f1", "f2"] should include all primary key 
constraint ["f0", "f1", "non-exist"])");
+    }
+    {
+        // non-exist partition keys
+        std::vector<std::string> non_exist_partition_keys = {"f1", 
"non-exist"};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
non_exist_partition_keys,
+                                                 primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            R"(Table column ["f0", "f1", "f2"] should include all partition 
fields ["f1", "non-exist"])");
+    }
+}
+
+TEST(SchemaValidationTest, NonPrimitivePrimaryKeyList) {
+    auto value_field = arrow::field("values", arrow::int32());
+    auto f0 = arrow::field("f0", arrow::list(value_field));
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                        "field f0 is unsupported");
+}
+
+TEST(SchemaValidationTest, NonPrimitivePrimaryKeyMap) {
+    auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                        "field f0 is unsupported");
+}
+
+TEST(SchemaValidationTest, NonPrimitivePartitionKeyStruct) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto child1 = arrow::field("inner1", arrow::int32());
+    auto child2 = arrow::field("inner2", arrow::float64());
+    auto f1 = arrow::field("f1", arrow::struct_({child1, child2}));
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                  {Options::BUCKET_KEY, "f0"}};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+    ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                        "field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestComplexPartitionKey) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::decimal128(5, 2));
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, {}));
+    ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                        "partition field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestComplexPartitionKeyWithBlob) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = BlobUtils::ToArrowField("f1");
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"f1"};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
/*primary_keys=*/{}, {}));
+    ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                        "partition field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestDateTypePartitionKey) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::date32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<TableSchema> table_schema,
+        TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, {}));
+    ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, ValidateFieldsPrefix) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    {
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"}, 
{"fields.f0,f1,f3", "some_value"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "f3 can not be found in table schema.");
+    }
+    {
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"}, 
{"fields.f0,f1,f2", "some_value"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"},
+            {Options::BUCKET_KEY, "f0"},
+            {Options::FIELDS_DEFAULT_AGG_FUNC, "some_value"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"}, {"fields.", 
"f1"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "invalid options key fields.");
+    }
+}
+
+TEST(SchemaValidationTest, ValidateBucket) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      {Options::BUCKET_KEY, 
"f0"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "please specify a bucket number.");
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "0"},
+                                                      {Options::BUCKET_KEY, 
"f0"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "The number of buckets needs to be greater than 
0.");
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f2"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "You should use dynamic bucket (bucket = -1) mode in cross 
partition update case");
+    }
+    {
+        std::vector<std::string> primary_keys = {};
+        std::vector<std::string> partition_keys = {"f2"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "You should define a 'bucket-key' for bucketed 
append mode");
+    }
+    {
+        std::vector<std::string> partition_keys = {"f2"};
+        std::map<std::string, std::string> options = 
{{"full-compaction.delta-commits", "2"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
partition_keys,
+                                                 /*primary_keys=*/{}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "AppendOnlyTable of unware or dynamic bucket does 
not support "
+                            "'full-compaction.delta-commits");
+    }
+    {
+        auto f3 = arrow::field("f3", arrow::map(arrow::utf8(), 
arrow::int32()));
+        arrow::FieldVector new_fields = {f0, f1, f2, f3};
+        auto new_schema = arrow::schema(new_fields);
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f3"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, new_schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{}, 
options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Nested type cannot be in bucket-key, in your table these keys 
are: f3");
+    }
+}
+
+TEST(SchemaValidationTest, ValidateDeletionVector) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    {
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"},
+            {Options::BUCKET_KEY, "f0"},
+            {Options::DELETION_VECTORS_ENABLED, "true"},
+            {Options::CHANGELOG_PRODUCER, "full-compaction"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "C++ Paimon does not support changelog-producer 
yet");
+    }
+    {
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{Options::DELETION_VECTORS_ENABLED, "true"},
+                                                      {Options::MERGE_ENGINE, 
"first-row"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "First row merge engine does not need deletion 
vectors because there "
+                            "is no deletion of old data in this merge 
engine.");
+    }
+}
+
+TEST(SchemaValidationTest, ValidateSequenceField) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> primary_keys = {"f0", "f1"};
+    std::vector<std::string> partition_keys = {"f1"};
+    {
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{Options::SEQUENCE_FIELD, "f0,f1,f2"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{Options::SEQUENCE_FIELD, "f0,f1,f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "cannot be found in table schema.");
+    }
+    {
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{Options::SEQUENCE_FIELD, "f0,f1,f2"},
+                                                      {Options::MERGE_ENGINE, 
"first-row"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Do not support using sequence field on FIRST_ROW 
merge engine.");
+    }
+    {
+        std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+                                                      
{Options::SEQUENCE_FIELD, "f0,f1,f2"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{"f2"},
+                                                 /*primary_keys=*/{"f0", 
"f1"}, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "You cannot use sequence.field in cross partition 
update case (Primary "
+                            "key constraint 'f0, f1'  not including all 
partition fields 'f2').");
+    }
+}
+
+TEST(SchemaValidationTest, ValidateSequenceGroup) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{"fields.f0,f1.sequence-group", "f2"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{"fields.f0,f3.sequence-group", "f2"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Field f3 can not be found in table schema.");
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{"fields.f0,f1.sequence-group", "f3"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Field f3 can not be found in table schema.");
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+                                                      {Options::BUCKET_KEY, 
"f0"},
+                                                      
{"fields.f0,f1.sequence-group", "f0,f1"},
+                                                      
{"fields.f2.sequence-group", "f0,f1"}};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "defined repeatedly by multiple groups");
+    }
+    {
+        std::vector<std::string> primary_keys = {"f0", "f1"};
+        std::vector<std::string> partition_keys = {"f1"};
+        std::map<std::string, std::string> options = {
+            {Options::BUCKET, "2"},
+            {Options::BUCKET_KEY, "f0"},
+            {"fields.f0,f1.sequence-group", "f2"},
+            {"fields.f0.aggregate-function", "min"},
+        };
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, schema, partition_keys, 
primary_keys, options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Should not define aggregation function on 
sequence group");
+    }
+}
+
+TEST(SchemaValidationTest, ValidateInvalidConfiguration) {
+    auto f0 = arrow::field("f0", arrow::utf8());
+    auto f1 = arrow::field("f1", arrow::int32());
+    auto f2 = arrow::field("f2", arrow::float64());
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto schema = arrow::schema(fields);
+    {
+        std::map<std::string, std::string> options = 
{{Options::CHANGELOG_PRODUCER, "input"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Can not set changelog-producer on table without 
primary keys, please "
+                            "define primary keys.");
+    }
+    {
+        auto invalid_field = arrow::field("_SEQUENCE_NUMBER", arrow::int64());
+        arrow::FieldVector invalid_fields = fields;
+        invalid_fields.push_back(invalid_field);
+        auto invalid_schema = arrow::schema(invalid_fields);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, invalid_schema, 
/*partition_keys=*/{},
+                                /*primary_keys=*/{}, /*options=*/{}));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "field name '_SEQUENCE_NUMBER' in schema cannot be 
special field.");
+    }
+    {
+        auto invalid_field = arrow::field("_KEY_a", arrow::int64());
+        arrow::FieldVector invalid_fields = fields;
+        invalid_fields.push_back(invalid_field);
+        auto invalid_schema = arrow::schema(invalid_fields);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, invalid_schema, 
/*partition_keys=*/{},
+                                /*primary_keys=*/{}, /*options=*/{}));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "field name '_KEY_a' in schema cannot start with 
'_KEY_'");
+    }
+    {
+        std::map<std::string, std::string> options = 
{{Options::CHANGELOG_PRODUCER, "input"},
+                                                      {Options::MERGE_ENGINE, 
"first-row"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{"f0"}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "C++ Paimon does not support changelog-producer 
yet");
+    }
+    {
+        std::map<std::string, std::string> options = 
{{Options::CHANGELOG_PRODUCER, "lookup"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{"f0"}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "C++ Paimon does not support changelog-producer 
yet");
+    }
+    // test for row tracking
+    {
+        std::map<std::string, std::string> options = 
{{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      {Options::BUCKET, "1"},
+                                                      {Options::BUCKET_KEY, 
"f0"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{}, 
options));
+        ASSERT_NOK_WITH_MSG(
+            SchemaValidation::ValidateTableSchema(*table_schema),
+            "Cannot define bucket for row tracking table, it only support 
bucket = -1");
+    }
+    {
+        std::map<std::string, std::string> options = 
{{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      {Options::BUCKET, "-1"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{"f0"}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Cannot define primary key for row tracking 
table");
+    }
+    {
+        std::map<std::string, std::string> options = 
{{Options::DATA_EVOLUTION_ENABLED, "true"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Data evolution config must enabled with 
row-tracking.enabled");
+    }
+    {
+        std::map<std::string, std::string> options = 
{{Options::ROW_TRACKING_ENABLED, "true"},
+                                                      
{Options::DATA_EVOLUTION_ENABLED, "true"},
+                                                      
{Options::DELETION_VECTORS_ENABLED, "true"}};
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+                             TableSchema::Create(/*schema_id=*/0, schema, 
/*partition_keys=*/{},
+                                                 /*primary_keys=*/{}, 
options));
+        
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+                            "Data evolution config must disabled with 
deletion-vectors.enabled");
+    }
+}
+}  // namespace paimon::test

Reply via email to