This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6ab1b2a feat(schema): add schema validation utilities (#55)
6ab1b2a is described below
commit 6ab1b2a260dc5a9b46d6b0236cf5f561b866c6a5
Author: Yonghao Fang <[email protected]>
AuthorDate: Mon Jun 8 15:20:36 2026 +0800
feat(schema): add schema validation utilities (#55)
* feat(schema): add schema validation utilities
* fix
---
src/paimon/core/schema/arrow_schema_validator.cpp | 266 +++++++
src/paimon/core/schema/arrow_schema_validator.h | 64 ++
.../core/schema/arrow_schema_validator_test.cpp | 348 +++++++++
src/paimon/core/schema/schema_validation.cpp | 510 +++++++++++++
src/paimon/core/schema/schema_validation.h | 79 ++
src/paimon/core/schema/schema_validation_test.cpp | 811 +++++++++++++++++++++
6 files changed, 2078 insertions(+)
diff --git a/src/paimon/core/schema/arrow_schema_validator.cpp
b/src/paimon/core/schema/arrow_schema_validator.cpp
new file mode 100644
index 0000000..22be2de
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator.cpp
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/arrow_schema_validator.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class KeyValueMetadata;
+} // namespace arrow
+
+namespace paimon {
+
+bool ArrowSchemaValidator::IsNestedType(const
std::shared_ptr<arrow::DataType>& data_type) {
+ return (data_type->id() == arrow::Type::MAP || data_type->id() ==
arrow::Type::LIST ||
+ data_type->id() == arrow::Type::STRUCT);
+}
+
+Status ArrowSchemaValidator::ValidateSchema(const arrow::Schema& schema) {
+ // validate no duplicate fields
+ PAIMON_RETURN_NOT_OK(ValidateNoRedundantFields(schema.fields()));
+ // validate no whitespace-only fields
+ PAIMON_RETURN_NOT_OK(ValidateNoWhitespaceOnlyFields(schema.fields()));
+ // validate data type
+ for (const auto& field : schema.fields()) {
+ PAIMON_RETURN_NOT_OK(ValidateField(field));
+ }
+ return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateSchemaWithFieldId(const arrow::Schema&
schema) {
+ PAIMON_RETURN_NOT_OK(ValidateSchema(schema));
+ auto struct_type = arrow::struct_(schema.fields());
+ std::set<int32_t> field_id_set;
+ PAIMON_RETURN_NOT_OK(
+ ValidateDataTypeWithFieldId(struct_type,
/*key_value_metadata=*/nullptr, &field_id_set));
+ return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateNoRedundantFields(const
arrow::FieldVector& fields) {
+ std::set<std::string> field_names;
+ for (const auto& field : fields) {
+ auto iter = field_names.find(field->name());
+ if (iter != field_names.end()) {
+ return Status::Invalid(fmt::format(
+ "validate schema failed: read schema has duplicate field {}",
field->name()));
+ }
+ field_names.insert(field->name());
+ if (IsNestedType(field->type())) {
+
PAIMON_RETURN_NOT_OK(ValidateNoRedundantFields(field->type()->fields()));
+ }
+ }
+ return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(const
arrow::FieldVector& fields) {
+ for (const auto& field : fields) {
+ if (StringUtils::IsNullOrWhitespaceOnly(field->name())) {
+ return Status::Invalid(
+ fmt::format("validate schema failed: read schema has
whitespace-only field"));
+ }
+ if (IsNestedType(field->type())) {
+
PAIMON_RETURN_NOT_OK(ValidateNoWhitespaceOnlyFields(field->type()->fields()));
+ }
+ }
+ return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateDataTypeWithFieldId(
+ const std::shared_ptr<arrow::DataType>& type,
+ const std::shared_ptr<const arrow::KeyValueMetadata>& key_value_metadata,
+ std::set<int32_t>* field_id_set) {
+ const auto kind = type->id();
+ switch (kind) {
+ case arrow::Type::type::BOOL:
+ case arrow::Type::type::INT8:
+ case arrow::Type::type::INT16:
+ case arrow::Type::type::INT32:
+ case arrow::Type::type::INT64:
+ case arrow::Type::type::FLOAT:
+ case arrow::Type::type::DOUBLE:
+ case arrow::Type::type::STRING:
+ case arrow::Type::type::BINARY:
+ case arrow::Type::type::DATE32:
+ case arrow::Type::type::DECIMAL128:
+ case arrow::Type::type::TIMESTAMP:
+ return Status::OK();
+ case arrow::Type::type::LIST: {
+ const auto& value_field =
+
arrow::internal::checked_cast<arrow::BaseListType*>(type.get())->value_field();
+ PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(
+ value_field->type(), value_field->metadata(), field_id_set));
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ arrow::FieldVector sub_fields =
+
arrow::internal::checked_cast<arrow::StructType*>(type.get())->fields();
+ for (const auto& sub_field : sub_fields) {
+ PAIMON_ASSIGN_OR_RAISE(DataField data_field,
+
DataField::ConvertArrowFieldToDataField(sub_field));
+ auto iter = field_id_set->find(data_field.Id());
+ if (iter != field_id_set->end()) {
+ return Status::Invalid(fmt::format(
+ "field id must be unique, duplicate field id {}",
data_field.Id()));
+ }
+ field_id_set->insert(data_field.Id());
+ PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(
+ sub_field->type(), sub_field->metadata(), field_id_set));
+ }
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ const auto& key_field =
+
arrow::internal::checked_cast<arrow::MapType*>(type.get())->key_field();
+ const auto& item_field =
+
arrow::internal::checked_cast<arrow::MapType*>(type.get())->item_field();
+ PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(key_field->type(),
+
key_field->metadata(), field_id_set));
+
PAIMON_RETURN_NOT_OK(ValidateDataTypeWithFieldId(item_field->type(),
+
item_field->metadata(), field_id_set));
+ break;
+ }
+ case arrow::Type::type::LARGE_BINARY: {
+ if (BlobUtils::IsBlobMetadata(key_value_metadata)) {
+ break;
+ }
+ [[fallthrough]];
+ }
+ default: {
+ return Status::Invalid("Unknown or unsupported arrow type: ",
type->ToString());
+ }
+ }
+ return Status::OK();
+}
+
+Status ArrowSchemaValidator::ValidateField(const
std::shared_ptr<arrow::Field>& field) {
+ const auto kind = field->type()->id();
+ switch (kind) {
+ case arrow::Type::type::BOOL:
+ case arrow::Type::type::INT8:
+ case arrow::Type::type::INT16:
+ case arrow::Type::type::INT32:
+ case arrow::Type::type::INT64:
+ case arrow::Type::type::FLOAT:
+ case arrow::Type::type::DOUBLE:
+ case arrow::Type::type::STRING:
+ case arrow::Type::type::BINARY:
+ case arrow::Type::type::DATE32:
+ case arrow::Type::type::TIMESTAMP:
+ break;
+ case arrow::Type::type::DECIMAL128:
+
PAIMON_RETURN_NOT_OK(DecimalUtils::CheckDecimalType(*field->type()));
+ break;
+ case arrow::Type::type::LIST: {
+ const auto& value_field =
+ arrow::internal::checked_cast<const
arrow::BaseListType&>(*field->type())
+ .value_field();
+ PAIMON_RETURN_NOT_OK(ValidateField(value_field));
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ arrow::FieldVector arrow_fields =
+ arrow::internal::checked_cast<const
arrow::StructType&>(*field->type()).fields();
+ for (const auto& sub_field : arrow_fields) {
+ PAIMON_RETURN_NOT_OK(ValidateField(sub_field));
+ }
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ const auto& key_field =
+ arrow::internal::checked_cast<const
arrow::MapType&>(*field->type()).key_field();
+ const auto& item_field =
+ arrow::internal::checked_cast<const
arrow::MapType&>(*field->type()).item_field();
+ PAIMON_RETURN_NOT_OK(ValidateField(key_field));
+ PAIMON_RETURN_NOT_OK(ValidateField(item_field));
+ break;
+ }
+ case arrow::Type::type::LARGE_BINARY: {
+ if (BlobUtils::IsBlobField(field)) {
+ break;
+ }
+ [[fallthrough]];
+ }
+ default: {
+ return Status::Invalid("Unknown or unsupported arrow type: ",
+ field->type()->ToString());
+ }
+ }
+ return Status::OK();
+}
+
+bool ArrowSchemaValidator::ContainTimestampWithTimezone(const arrow::DataType&
type) {
+ const auto kind = type.id();
+ switch (kind) {
+ case arrow::Type::type::LIST: {
+ const auto& value_field =
+ arrow::internal::checked_cast<const
arrow::ListType&>(type).value_field();
+ if (ContainTimestampWithTimezone(*value_field->type())) {
+ return true;
+ }
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ arrow::FieldVector arrow_fields =
+ arrow::internal::checked_cast<const
arrow::StructType&>(type).fields();
+ for (const auto& sub_field : arrow_fields) {
+ if (ContainTimestampWithTimezone(*sub_field->type())) {
+ return true;
+ }
+ }
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ const auto& key_field =
+ arrow::internal::checked_cast<const
arrow::MapType&>(type).key_field();
+ const auto& item_field =
+ arrow::internal::checked_cast<const
arrow::MapType&>(type).item_field();
+ if (ContainTimestampWithTimezone(*key_field->type())) {
+ return true;
+ }
+ if (ContainTimestampWithTimezone(*item_field->type())) {
+ return true;
+ }
+ break;
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ const auto& ts_type = arrow::internal::checked_cast<const
arrow::TimestampType&>(type);
+ if (!ts_type.timezone().empty()) {
+ return true;
+ }
+ return false;
+ }
+ default: {
+ return false;
+ }
+ }
+ return false;
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/arrow_schema_validator.h
b/src/paimon/core/schema/arrow_schema_validator.h
new file mode 100644
index 0000000..6118445
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <set>
+#include <vector>
+
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace arrow {
+class DataType;
+class Field;
+class Schema;
+class KeyValueMetadata;
+} // namespace arrow
+
+namespace paimon {
+class PAIMON_EXPORT ArrowSchemaValidator {
+ public:
+ ArrowSchemaValidator() = delete;
+ ~ArrowSchemaValidator() = delete;
+ static Status ValidateSchema(const arrow::Schema& schema);
+
+ static Status ValidateSchemaWithFieldId(const arrow::Schema& schema);
+
+ static Status ValidateNoRedundantFields(const arrow::FieldVector& fields);
+
+ static Status ValidateNoWhitespaceOnlyFields(const arrow::FieldVector&
fields);
+
+ static Status ValidateField(const std::shared_ptr<arrow::Field>& field);
+
+ static bool ContainTimestampWithTimezone(const arrow::DataType& type);
+
+ static bool IsNestedType(const std::shared_ptr<arrow::DataType>&
data_type);
+
+ private:
+ static Status ValidateDataTypeWithFieldId(
+ const std::shared_ptr<arrow::DataType>& type,
+ const std::shared_ptr<const arrow::KeyValueMetadata>&
key_value_metadata,
+ std::set<int32_t>* field_id_set);
+};
+} // namespace paimon
diff --git a/src/paimon/core/schema/arrow_schema_validator_test.cpp
b/src/paimon/core/schema/arrow_schema_validator_test.cpp
new file mode 100644
index 0000000..f1ec0e5
--- /dev/null
+++ b/src/paimon/core/schema/arrow_schema_validator_test.cpp
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/arrow_schema_validator.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ArrowSchemaValidatorTest, TestSimple) {
+ auto col1_field = arrow::field("col1", arrow::int64());
+ auto col2_field = arrow::field("col2", arrow::int32());
+ auto col3_field = arrow::field("col3", arrow::int16());
+ auto col4_field = arrow::field("col4", arrow::int8());
+ auto col5_field = arrow::field("col5", arrow::float64());
+ auto col6_field = arrow::field("col6", arrow::float32());
+ auto col7_field = arrow::field("col7", arrow::boolean());
+ auto col8_field = arrow::field("col8", arrow::utf8());
+ auto col9_field = arrow::field("col9", arrow::binary());
+ auto col10_field = arrow::field("col10", arrow::date32());
+ auto col11_field = arrow::field("col11", arrow::decimal128(20, 4));
+ auto col12_field = arrow::field("col12", arrow::decimal128(18, 5));
+ auto col13_field = arrow::field("col13", arrow::list(arrow::int64()));
+ auto col14_field = arrow::field("col14", arrow::map(arrow::utf8(),
arrow::int64()));
+ auto col15_field = arrow::field("col15",
arrow::timestamp(arrow::TimeUnit::NANO));
+ auto col16_field = arrow::field(
+ "col16",
+ arrow::struct_({arrow::field("sub1", arrow::int8()),
arrow::field("sub2", arrow::int16()),
+ arrow::field("sub3", arrow::int64())}));
+
+ auto arrow_schema = arrow::schema(
+ arrow::FieldVector({col1_field, col2_field, col3_field, col4_field,
col5_field, col6_field,
+ col7_field, col8_field, col9_field, col10_field,
col11_field,
+ col12_field, col13_field, col14_field,
col15_field, col16_field}));
+ ASSERT_OK(ArrowSchemaValidator::ValidateSchema(*arrow_schema));
+}
+
+TEST(ArrowSchemaValidatorTest, TestValidateNoRedundantFields) {
+ auto col1_field = arrow::field("col1", arrow::int64());
+ auto col2_field = arrow::field("col2", arrow::int32());
+ auto col3_field = arrow::field("col3", arrow::int16());
+ {
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field,
col2_field, col3_field}));
+
ASSERT_OK(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()));
+ }
+ {
+ auto arrow_schema =
+ arrow::schema(arrow::FieldVector({col1_field, col2_field,
col3_field, col2_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()),
+ "validate schema failed: read schema has duplicate
field col2");
+ }
+ {
+ auto col4_field =
+ arrow::field("col4", arrow::struct_({arrow::field("sub1",
arrow::int8()),
+ arrow::field("sub1",
arrow::int16())}));
+ auto arrow_schema =
+ arrow::schema(arrow::FieldVector({col1_field, col2_field,
col3_field, col4_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateNoRedundantFields(arrow_schema->fields()),
+ "validate schema failed: read schema has duplicate
field sub1");
+ }
+}
+
+TEST(ArrowSchemaValidatorTest, TestValidateNoWhitespaceOnlyFields) {
+ auto col1_field = arrow::field("col1", arrow::int64());
+ auto col2_field = arrow::field("col2", arrow::int32());
+ auto col3_field = arrow::field("col3", arrow::int16());
+ {
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field,
col2_field, col3_field}));
+
ASSERT_OK(ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()));
+ }
+ {
+ auto col4_field = arrow::field(" ", arrow::int16());
+ auto arrow_schema =
+ arrow::schema(arrow::FieldVector({col1_field, col2_field,
col3_field, col4_field}));
+ ASSERT_NOK_WITH_MSG(
+
ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()),
+ "validate schema failed: read schema has whitespace-only field");
+ }
+ {
+ auto col4_field = arrow::field("col4",
arrow::struct_({arrow::field("sub1", arrow::int8()),
+ arrow::field("
", arrow::int16())}));
+ auto arrow_schema =
+ arrow::schema(arrow::FieldVector({col1_field, col2_field,
col3_field, col4_field}));
+ ASSERT_NOK_WITH_MSG(
+
ArrowSchemaValidator::ValidateNoWhitespaceOnlyFields(arrow_schema->fields()),
+ "validate schema failed: read schema has whitespace-only field");
+ }
+}
+
+TEST(ArrowSchemaValidatorTest, TestInvalidDataType) {
+ {
+ auto col1_field = arrow::field("col1", arrow::large_utf8());
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type: large_string");
+ }
+ {
+ auto col1_field = arrow::field("col1", arrow::large_binary());
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type: large_binary");
+ }
+ {
+ auto col1_field = arrow::field("col1", arrow::uint32());
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type: uint32");
+ }
+ {
+ auto union_type = arrow::sparse_union(
+ {arrow::field("_union_0", arrow::int32()),
arrow::field("_union_1", arrow::utf8())});
+ auto col1_field = arrow::field("col1", union_type);
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type:
sparse_union<_union_0: int32=0, "
+ "_union_1: string=1>");
+ }
+ {
+ auto col1_field = arrow::field("col1", arrow::date64());
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type: date64[ms]");
+ }
+ {
+ auto col1_field = arrow::field("col1", arrow::decimal256(10, 5));
+ auto arrow_schema = arrow::schema(arrow::FieldVector({col1_field}));
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchema(*arrow_schema),
+ "Unknown or unsupported arrow type: decimal256(10,
5)");
+ }
+}
+
+TEST(ArrowSchemaValidatorTest, ValidateDataTypeWithFieldId) {
+ {
+ std::vector<DataField> fields = {DataField(3, arrow::field("f3",
arrow::float64())),
+ DataField(0, arrow::field("f0",
arrow::utf8())),
+ DataField(1, arrow::field("f1",
arrow::int32()))};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema));
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3, arrow::field("sub_f3", arrow::int32()))};
+ std::vector<DataField> sub_fields1 = {
+ DataField(5, arrow::field("sub_f5", arrow::float64())),
+ DataField(6, arrow::field("sub_f6", arrow::utf8())),
+ DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ DataField field1 = DataField(
+ 4, arrow::field(
+ "f1", arrow::map(arrow::utf8(),
+
DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+ DataField field2 =
+ DataField(8, arrow::field("f2", arrow::map(arrow::int8(),
arrow::int16())));
+ std::vector<DataField> fields = {field0, field1, field2};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema))
+ <<
ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema).ToString();
+ }
+ {
+ std::vector<DataField> fields = {DataField(0, arrow::field("f3",
arrow::float64())),
+ DataField(0, arrow::field("f0",
arrow::utf8())),
+ DataField(1, arrow::field("f1",
arrow::int32()))};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+ "field id must be unique, duplicate field id 0");
+ }
+ {
+ arrow::FieldVector fields = {arrow::field("f3", arrow::float64()),
+ arrow::field("f0", arrow::utf8()),
+ arrow::field("f1", arrow::int32())};
+ auto arrow_schema = arrow::schema(fields);
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+ "invalid read schema, lack of metadata of field
id");
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3, arrow::field("sub_f3", arrow::int32()))};
+ std::vector<DataField> sub_fields1 = {
+ DataField(5, arrow::field("sub_f5", arrow::float64())),
+ DataField(6, arrow::field("sub_f6", arrow::utf8())),
+ DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ DataField field1 = DataField(
+ 4, arrow::field(
+ "f1",
arrow::list(DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+ DataField field2 =
+ DataField(8, arrow::field("f2", arrow::map(arrow::int8(),
arrow::int16())));
+ std::vector<DataField> fields = {field0, field1, field2};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema))
+ <<
ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema).ToString();
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3, arrow::field("sub_f3", arrow::int32()))};
+ std::vector<DataField> sub_fields1 = {
+ DataField(5, arrow::field("sub_f5", arrow::float64())),
+ DataField(4, arrow::field("sub_f6", arrow::utf8())),
+ DataField(7, arrow::field("sub_f7", arrow::int32()))};
+
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ DataField field1 = DataField(
+ 4, arrow::field(
+ "f1",
arrow::list(DataField::ConvertDataFieldsToArrowStructType(sub_fields1))));
+ DataField field2 =
+ DataField(8, arrow::field("f2", arrow::map(arrow::int8(),
arrow::int16())));
+ std::vector<DataField> fields = {field0, field1, field2};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+ "field id must be unique, duplicate field id 4");
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3, arrow::field("sub_f3", arrow::int32()))};
+ arrow::FieldVector invalid_sub_fields = {arrow::field("sub_f4",
arrow::float64()),
+ arrow::field("sub_f5",
arrow::utf8()),
+ arrow::field("sub_f6",
arrow::int32())};
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ DataField field1 =
+ DataField(4, arrow::field("f1",
arrow::list(arrow::struct_(invalid_sub_fields))));
+ DataField field2 =
+ DataField(5, arrow::field("f2", arrow::map(arrow::int8(),
arrow::int16())));
+ std::vector<DataField> fields = {field0, field1, field2};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema),
+ "invalid read schema, lack of metadata of field
id");
+ }
+ {
+ std::vector<DataField> fields = {DataField(0, arrow::field("f0",
arrow::float64())),
+ DataField(1, arrow::field("f1",
arrow::large_utf8())),
+ DataField(2, arrow::field("f2",
arrow::int32()))};
+ auto struct_type =
DataField::ConvertDataFieldsToArrowStructType(fields);
+ std::set<int32_t> field_id_set;
+ ASSERT_NOK_WITH_MSG(ArrowSchemaValidator::ValidateDataTypeWithFieldId(
+ struct_type, /*key_value_metadata=*/nullptr,
&field_id_set),
+ "Unknown or unsupported arrow type: large_string");
+ }
+}
+
+TEST(ArrowSchemaValidatorTest, ContainTimestampWithTimezone) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ {
+ std::vector<DataField> fields = {
+ DataField(0, arrow::field("f0", arrow::float64())),
+ DataField(1, arrow::field("f1", arrow::utf8())),
+ DataField(2, arrow::field("f2",
arrow::timestamp(arrow::TimeUnit::NANO)))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_FALSE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ std::vector<DataField> fields = {
+ DataField(0, arrow::field("f0", arrow::float64())),
+ DataField(1, arrow::field("f1", arrow::utf8())),
+ DataField(2, arrow::field("f2",
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3,
+ arrow::field("sub_f3",
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ std::vector<DataField> fields = {field0, DataField(4,
arrow::field("f1", arrow::utf8()))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ std::vector<DataField> sub_fields0 = {
+ DataField(1, arrow::field("sub_f1", arrow::float64())),
+ DataField(2, arrow::field("sub_f2", arrow::utf8())),
+ DataField(3, arrow::field("sub_f3", arrow::decimal128(20, 5)))};
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
DataField::ConvertDataFieldsToArrowStructType(sub_fields0)));
+ std::vector<DataField> fields = {field0, DataField(4,
arrow::field("f1", arrow::utf8()))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_FALSE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ DataField field0 = DataField(
+ 0, arrow::field("f0", arrow::map(arrow::int8(),
+
arrow::timestamp(arrow::TimeUnit::NANO, timezone))));
+ std::vector<DataField> fields = {field0, DataField(1,
arrow::field("f1", arrow::utf8()))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
arrow::map(arrow::timestamp(arrow::TimeUnit::NANO, timezone),
+ arrow::int8())));
+ std::vector<DataField> fields = {field0, DataField(1,
arrow::field("f1", arrow::utf8()))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+ {
+ DataField field0 = DataField(
+ 0, arrow::field("f0",
arrow::list(arrow::timestamp(arrow::TimeUnit::NANO, timezone))));
+ std::vector<DataField> fields = {field0, DataField(1,
arrow::field("f1", arrow::utf8()))};
+ std::shared_ptr<arrow::DataType> arrow_data_type =
+ DataField::ConvertDataFieldsToArrowStructType(fields);
+
ASSERT_TRUE(ArrowSchemaValidator::ContainTimestampWithTimezone(*arrow_data_type));
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/core/schema/schema_validation.cpp
b/src/paimon/core/schema/schema_validation.cpp
new file mode 100644
index 0000000..28949a4
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation.cpp
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_validation.h"
+
+#include <algorithm>
+#include <cassert>
+#include <map>
+#include <optional>
+#include <set>
+#include <sstream>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/preconditions.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/options/changelog_producer.h"
+#include "paimon/core/options/expire_config.h"
+#include "paimon/core/options/merge_engine.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/table/bucket_mode.h"
+#include "paimon/defs.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+bool SchemaValidation::IsComplexType(const std::shared_ptr<arrow::Field>&
field) {
+ return (field->type()->id() == arrow::Type::TIMESTAMP ||
+ field->type()->id() == arrow::Type::DECIMAL ||
BlobUtils::IsBlobField(field));
+}
+
+Status SchemaValidation::ValidateTableSchema(const TableSchema& schema) {
+ const auto& field_names = schema.FieldNames();
+ PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.BucketKeys(), "bucket
key"));
+ PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.PrimaryKeys(),
"primary key"));
+ PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(schema.PartitionKeys(),
"partition key"));
+ PAIMON_RETURN_NOT_OK(
+ Preconditions::CheckState(ObjectUtils::ContainsAll(field_names,
schema.PartitionKeys()),
+ "Table column {} should include all
partition fields {}",
+ field_names, schema.PartitionKeys()));
+ PAIMON_RETURN_NOT_OK(
+ Preconditions::CheckState(ObjectUtils::ContainsAll(field_names,
schema.PrimaryKeys()),
+ "Table column {} should include all primary
key constraint {}",
+ field_names, schema.PrimaryKeys()));
+
+ PAIMON_RETURN_NOT_OK(
+ ValidateOnlyContainPrimitiveType(schema.Fields(),
schema.PrimaryKeys(), "primary key"));
+ PAIMON_RETURN_NOT_OK(
+ ValidateOnlyContainPrimitiveType(schema.Fields(),
schema.PartitionKeys(), "partition"));
+ // TODO(lisizhuo.lsz): C++ Paimon do not support timestamp & decimal type
in partition keys for
+ // now.
+ PAIMON_RETURN_NOT_OK(ValidateNotContainComplexType(schema.Fields(),
schema.PartitionKeys()));
+
+ PAIMON_ASSIGN_OR_RAISE(CoreOptions options,
CoreOptions::FromMap(schema.Options()));
+ PAIMON_RETURN_NOT_OK(ValidateBucket(schema, options));
+ // PAIMON_RETURN_NOT_OK(ValidateDefaultValues(schema));
+ // PAIMON_RETURN_NOT_OK(ValidateStartupMode(options));
+ PAIMON_RETURN_NOT_OK(ValidateFieldsPrefix(schema, options));
+ PAIMON_RETURN_NOT_OK(ValidateSequenceField(schema, options));
+ PAIMON_RETURN_NOT_OK(ValidateSequenceGroup(schema, options));
+
+ ChangelogProducer changelog_producer = options.GetChangelogProducer();
+ if (schema.PrimaryKeys().empty() && changelog_producer !=
ChangelogProducer::NONE) {
+ return Status::Invalid(
+ fmt::format("Can not set {} on table without primary keys, please
define primary keys.",
+ Options::CHANGELOG_PRODUCER));
+ }
+ PAIMON_RETURN_NOT_OK(ValidateChangelogProducer(options));
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ options.GetExpireConfig().GetSnapshotRetainMin() > 0,
+ std::string(Options::SNAPSHOT_NUM_RETAINED_MIN) + " should be at least
1"));
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ options.GetExpireConfig().GetSnapshotRetainMin() <=
+ options.GetExpireConfig().GetSnapshotRetainMax(),
+ std::string(Options::SNAPSHOT_NUM_RETAINED_MIN) + " should not be
larger than " +
+ std::string(Options::SNAPSHOT_NUM_RETAINED_MAX)));
+
+ // TODO(yonghao.fyh): check changelog num retain
+ // TODO(yonghao.fyh): support file format validate data fields
+ for (const auto& field_name : field_names) {
+ if (SpecialFields::IsSpecialFieldName(field_name)) {
+ return Status::Invalid(
+ fmt::format("field name '{}' in schema cannot be special
field.", field_name));
+ }
+ if (StringUtils::StartsWith(field_name,
SpecialFields::KEY_FIELD_PREFIX)) {
+ return Status::Invalid(fmt::format("field name '{}' in schema
cannot start with '{}'.",
+ field_name,
SpecialFields::KEY_FIELD_PREFIX));
+ }
+ }
+ // TODO(yonghao.fyh): check streaming read overwrite
+ // TODO(yonghao.fyh): check 'partition.expiration-time'
+ // TODO(yonghao.fyh): check 'rowkind.field'
+ if (options.DeletionVectorsEnabled()) {
+ PAIMON_RETURN_NOT_OK(ValidateForDeletionVectors(options));
+ }
+
+ PAIMON_RETURN_NOT_OK(ValidateRowTracking(schema, options));
+ PAIMON_RETURN_NOT_OK(ValidateBlobFields(schema, options));
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateNoDuplicateField(const
std::vector<std::string>& field_names,
+ const std::string&
error_message_intro) {
+ auto duplicate_field_names = ObjectUtils::DuplicateItems(field_names);
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ duplicate_field_names.empty(),
+ fmt::format("{} [{}] must not contain duplicate fields. Found: [{}]",
error_message_intro,
+ fmt::join(field_names, ", "),
fmt::join(duplicate_field_names, ", "))));
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateOnlyContainPrimitiveType(
+ const std::vector<DataField>& fields, const std::vector<std::string>&
field_names,
+ const std::string& error_message_intro) {
+ if (field_names.empty()) {
+ return Status::OK();
+ }
+ std::unordered_map<std::string, std::shared_ptr<arrow::DataType>>
fields_map;
+ for (const auto& field : fields) {
+ fields_map[field.Name()] = field.Type();
+ }
+ for (const auto& field_name : field_names) {
+ auto it = fields_map.find(field_name);
+ if (it != fields_map.end()) {
+ auto data_type = it->second;
+ if (ArrowSchemaValidator::IsNestedType(data_type)) {
+ return Status::Invalid(fmt::format("The type {} in {} field {}
is unsupported",
+ data_type->ToString(),
error_message_intro,
+ it->first));
+ }
+ } else {
+ assert(false);
+ return Status::Invalid(
+ fmt::format("unexpected error, field {} not found in fields
map", field_name));
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateNotContainComplexType(
+ const std::vector<DataField>& fields, const std::vector<std::string>&
field_names) {
+ if (field_names.empty()) {
+ return Status::OK();
+ }
+ std::unordered_map<std::string, std::shared_ptr<arrow::Field>> fields_map;
+ for (const auto& field : fields) {
+ fields_map[field.Name()] = field.ArrowField();
+ }
+ for (const auto& field_name : field_names) {
+ auto it = fields_map.find(field_name);
+ if (it != fields_map.end()) {
+ auto field = it->second;
+ if (IsComplexType(field)) {
+ return Status::Invalid(
+ fmt::format("The field {} in partition field {} is
unsupported",
+ field->ToString(), it->first));
+ }
+ } else {
+ assert(false);
+ return Status::Invalid(fmt::format(
+ "unexpected error, partition field {} not found in schema",
field_name));
+ }
+ }
+ return Status::OK();
+}
+
+bool SchemaValidation::IsPostponeBucketTable(const TableSchema& schema,
int32_t bucket) {
+ return !schema.PrimaryKeys().empty() && bucket ==
BucketModeDefine::POSTPONE_BUCKET;
+}
+
+Status SchemaValidation::ValidateBucket(const TableSchema& schema, const
CoreOptions& options) {
+ int32_t bucket = options.GetBucket();
+ if (bucket == -1) {
+ if (options.ToMap().count(Options::BUCKET_KEY)) {
+ return Status::Invalid(
+ fmt::format("Cannot define '{}' with bucket -1, please specify
a bucket number.",
+ Options::BUCKET_KEY));
+ }
+ if (schema.PrimaryKeys().empty() &&
+ options.ToMap().count("full-compaction.delta-commits")) {
+ return Status::Invalid(
+ "AppendOnlyTable of unware or dynamic bucket does not support "
+ "'full-compaction.delta-commits'");
+ }
+ } else if (bucket < 1 && !IsPostponeBucketTable(schema, bucket)) {
+ return Status::Invalid("The number of buckets needs to be greater than
0.");
+ } else {
+ if (schema.CrossPartitionUpdate()) {
+ return Status::Invalid(fmt::format(
+ "You should use dynamic bucket (bucket = -1) mode in cross
partition update case "
+ "(Primary key constraint '{}' not include all partition fields
'{}').",
+ fmt::join(schema.PrimaryKeys(), ", "),
fmt::join(schema.PartitionKeys(), ", ")));
+ }
+ if (schema.PrimaryKeys().empty() && schema.BucketKeys().empty()) {
+ return Status::Invalid("You should define a 'bucket-key' for
bucketed append mode.");
+ }
+ if (!schema.BucketKeys().empty()) {
+ std::vector<std::string> bucket_keys = schema.BucketKeys();
+ std::vector<std::string> nested_fields;
+
+ for (const auto& field : schema.Fields()) {
+ if (std::find(bucket_keys.begin(), bucket_keys.end(),
field.Name()) !=
+ bucket_keys.end() &&
+ ArrowSchemaValidator::IsNestedType(field.Type())) {
+ nested_fields.push_back(field.Name());
+ }
+ }
+
+ if (!nested_fields.empty()) {
+ return Status::Invalid(fmt::format(
+ "Nested type cannot be in bucket-key, in your table these
keys are: {}",
+ fmt::join(nested_fields, ", ")));
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateChangelogProducer(const CoreOptions& options)
{
+ return Preconditions::CheckState(options.GetChangelogProducer() ==
ChangelogProducer::NONE,
+ "C++ Paimon does not support
changelog-producer yet. Please "
+ "keep changelog-producer as 'none'.");
+}
+
+Status SchemaValidation::ValidateForDeletionVectors(const CoreOptions&
options) {
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ options.GetChangelogProducer() == ChangelogProducer::NONE ||
+ options.GetChangelogProducer() == ChangelogProducer::INPUT ||
+ options.GetChangelogProducer() == ChangelogProducer::LOOKUP,
+ "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP
changelog producer now."));
+ return Preconditions::CheckState(
+ options.GetMergeEngine() != MergeEngine::FIRST_ROW,
+ "First row merge engine does not need deletion vectors because there
is "
+ "no deletion of old data in this merge engine.");
+}
+
+Status SchemaValidation::ValidateSequenceGroup(const TableSchema& schema,
+ const CoreOptions& options) {
+ std::unordered_map<std::string, std::set<std::string>> fields2_group;
+ auto sequence_groups_map = options.GetFieldsSequenceGroups();
+ const std::vector<std::string>& field_names = schema.FieldNames();
+ for (const auto& [k, v] : sequence_groups_map) {
+ std::vector<std::string> sequence_field_names =
+ StringUtils::Split(k, Options::FIELDS_SEPARATOR);
+ for (const auto& sequence_field_name : sequence_field_names) {
+ if (std::find(field_names.begin(), field_names.end(),
sequence_field_name) ==
+ field_names.end()) {
+ return Status::Invalid(
+ fmt::format("The sequence field group: {} can not be found
in table schema.",
+ sequence_field_name));
+ }
+ }
+
+ for (const auto& field : StringUtils::Split(v,
Options::FIELDS_SEPARATOR)) {
+ if (std::find(field_names.begin(), field_names.end(), field) ==
field_names.end()) {
+ return Status::Invalid(
+ fmt::format("Field {} can not be found in table schema.",
field));
+ }
+
+ if (fields2_group.count(field)) {
+ std::vector<std::vector<std::string>> sequence_groups;
+ sequence_groups.emplace_back(fields2_group[field].begin(),
+ fields2_group[field].end());
+ sequence_groups.push_back(sequence_field_names);
+
+ std::ostringstream sequence_groups_msg;
+ for (const auto& group : sequence_groups) {
+ sequence_groups_msg << "{";
+ for (const auto& group_field : group) {
+ sequence_groups_msg << group_field << " ";
+ }
+ sequence_groups_msg << "} ";
+ }
+ return Status::Invalid(
+ fmt::format("Field {} is defined repeatedly by multiple
groups: {}.", field,
+ sequence_groups_msg.str()));
+ }
+ fields2_group[field].insert(sequence_field_names.begin(),
sequence_field_names.end());
+ }
+ }
+
+ std::set<std::string> illegal_group;
+ for (const auto& group : fields2_group) {
+ for (const auto& field : group.second) {
+ PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> agg_func,
+ options.GetFieldAggFunc(field));
+ if (agg_func) {
+ illegal_group.insert(field);
+ }
+ }
+ }
+
+ if (!illegal_group.empty()) {
+ std::ostringstream illegal_group_msg;
+ illegal_group_msg << "Should not define aggregation function on
sequence group: ";
+ for (const auto& field : illegal_group) {
+ illegal_group_msg << field << " ";
+ }
+ return Status::Invalid(illegal_group_msg.str());
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateSequenceField(const TableSchema& schema,
+ const CoreOptions& options) {
+ std::vector<std::string> sequence_field = options.GetSequenceField();
+ if (!sequence_field.empty()) {
+ // Create field count map
+ std::unordered_map<std::string, int> field_count;
+ for (const auto& field : sequence_field) {
+ field_count[field]++;
+ }
+
+ const auto& field_names = schema.FieldNames();
+ for (const auto& field : sequence_field) {
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ std::find(field_names.begin(), field_names.end(), field) !=
field_names.end(),
+ fmt::format("Sequence field: '{}' cannot be found in table
schema.", field)));
+
+ PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> agg_func,
+ options.GetFieldAggFunc(field));
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ agg_func == std::nullopt,
+ fmt::format("Should not define aggregation on sequence field:
'{}'.", field)));
+
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ field_count[field] == 1, "Sequence field '" + field + "' is
defined repeatedly."));
+ }
+
+ // Check for FIRST_ROW merge engine
+ if (options.GetMergeEngine() == MergeEngine::FIRST_ROW) {
+ return Status::Invalid(
+ "Do not support using sequence field on FIRST_ROW merge
engine.");
+ }
+
+ // Check for cross partition update
+ if (schema.CrossPartitionUpdate()) {
+ return Status::Invalid(fmt::format(
+ "You cannot use sequence.field in cross partition update case
(Primary "
+ "key constraint '{}' not including all partition fields
'{}').",
+ fmt::join(schema.PrimaryKeys(), ", "),
fmt::join(schema.PartitionKeys(), ", ")));
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateFieldsPrefix(const TableSchema& schema,
+ const CoreOptions& options) {
+ const auto& field_names = schema.FieldNames();
+ const auto& options_map = options.ToMap();
+ for (const auto& [k, v] : options_map) {
+ if (StringUtils::StartsWith(k, Options::FIELDS_PREFIX)) {
+ std::vector<std::string> cols = StringUtils::Split(k, ".");
+ if (cols.size() < 2) {
+ return Status::Invalid("invalid options key " + k);
+ }
+ std::vector<std::string> fields =
+ StringUtils::Split(cols[1], Options::FIELDS_SEPARATOR);
+ for (const auto& field : fields) {
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ Options::DEFAULT_AGG_FUNCTION == field ||
+ std::find(field_names.begin(), field_names.end(),
field) !=
+ field_names.end(),
+ "Field " + field + " can not be found in table schema."));
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateRowTracking(const TableSchema& table_schema,
+ const CoreOptions& options) {
+ bool row_tracking_enabled = options.RowTrackingEnabled();
+ if (row_tracking_enabled) {
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ options.GetBucket() == -1,
+ "Cannot define {} for row tracking table, it only support bucket =
-1",
+ Options::BUCKET));
+ PAIMON_RETURN_NOT_OK(
+ Preconditions::CheckState(table_schema.PrimaryKeys().empty(),
+ "Cannot define primary key for row
tracking table"));
+ }
+ if (options.DataEvolutionEnabled()) {
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ row_tracking_enabled, "Data evolution config must enabled with
row-tracking.enabled"));
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ !options.DeletionVectorsEnabled(),
+ "Data evolution config must disabled with
deletion-vectors.enabled"));
+ }
+
+ std::vector<std::string> blob_names;
+ for (const auto& field : table_schema.Fields()) {
+ if (BlobUtils::IsBlobField(field.ArrowField())) {
+ blob_names.push_back(field.Name());
+ }
+ }
+ if (!blob_names.empty()) {
+ // Validate blob fields cannot be partition keys
+ for (const auto& blob_field_name : blob_names) {
+ if (std::find(table_schema.PartitionKeys().begin(),
table_schema.PartitionKeys().end(),
+ blob_field_name) !=
table_schema.PartitionKeys().end()) {
+ return Status::Invalid(
+ fmt::format("Blob field {} cannot be a partition key.",
blob_field_name));
+ }
+ }
+
+ // Validate data evolution must be enabled when blob-field is
configured
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ options.DataEvolutionEnabled(),
+ "Data evolution config must be enabled for table with BLOB type
column."));
+ PAIMON_RETURN_NOT_OK(Preconditions::CheckState(
+ table_schema.Fields().size() > blob_names.size(),
+ "Table with BLOB type column must have other normal columns."));
+ }
+ return Status::OK();
+}
+
+Status SchemaValidation::ValidateBlobFields(const TableSchema& schema, const
CoreOptions& options) {
+ const auto& configured_blob_names = options.GetBlobFields();
+ const auto& blob_descriptor_names = options.GetBlobDescriptorFields();
+ const auto& blob_view_names = options.GetBlobViewFields();
+ const auto& blob_external_storage_names =
options.GetBlobExternalStorageFields();
+ std::vector<std::string> configured_blob_like_names =
configured_blob_names;
+ configured_blob_like_names.insert(configured_blob_like_names.end(),
+ blob_descriptor_names.begin(),
blob_descriptor_names.end());
+ configured_blob_like_names.insert(configured_blob_like_names.end(),
blob_view_names.begin(),
+ blob_view_names.end());
+ if (configured_blob_like_names.empty() &&
blob_external_storage_names.empty()) {
+ return Status::OK();
+ }
+
+ auto validate_blob_fields = [&](const std::vector<std::string>&
field_names,
+ const std::string& option_key) -> Status {
+ if (field_names.empty()) {
+ return Status::OK();
+ }
+ PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(field_names,
option_key));
+ PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> blob_fields,
schema.GetFields(field_names));
+ for (const auto& blob_field : blob_fields) {
+ if (!BlobUtils::IsBlobField(blob_field.ArrowField())) {
+ return Status::Invalid(
+ fmt::format("Field '{}' in '{}' must be a BLOB field in
table schema.",
+ blob_field.Name(), option_key));
+ }
+ }
+ return Status::OK();
+ };
+
+ PAIMON_RETURN_NOT_OK(validate_blob_fields(configured_blob_names,
Options::BLOB_FIELD));
+ PAIMON_RETURN_NOT_OK(
+ validate_blob_fields(blob_descriptor_names,
Options::BLOB_DESCRIPTOR_FIELD));
+ PAIMON_RETURN_NOT_OK(validate_blob_fields(blob_view_names,
Options::BLOB_VIEW_FIELD));
+ PAIMON_RETURN_NOT_OK(
+ validate_blob_fields(blob_external_storage_names,
Options::BLOB_EXTERNAL_STORAGE_FIELD));
+
+ std::set<std::string>
blob_descriptor_name_set(blob_descriptor_names.begin(),
+
blob_descriptor_names.end());
+ for (const auto& blob_view_name : blob_view_names) {
+ if (blob_descriptor_name_set.count(blob_view_name) > 0) {
+ return Status::Invalid(fmt::format("Field '{}' in '{}' can not
also be in '{}'.",
+ blob_view_name,
Options::BLOB_VIEW_FIELD,
+
Options::BLOB_DESCRIPTOR_FIELD));
+ }
+ }
+
+ for (const auto& blob_external_storage_name : blob_external_storage_names)
{
+ if (blob_descriptor_name_set.count(blob_external_storage_name) == 0) {
+ return Status::Invalid(
+ fmt::format("Field '{}' in '{}' must also be in '{}'.",
blob_external_storage_name,
+ Options::BLOB_EXTERNAL_STORAGE_FIELD,
Options::BLOB_DESCRIPTOR_FIELD));
+ }
+ }
+ if (!blob_external_storage_names.empty()) {
+ auto external_storage_path = options.GetBlobExternalStoragePath();
+ if (!external_storage_path || external_storage_path->empty()) {
+ return Status::Invalid(fmt::format("'{}' must be set when '{}' is
configured.",
+
Options::BLOB_EXTERNAL_STORAGE_PATH,
+
Options::BLOB_EXTERNAL_STORAGE_FIELD));
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/schema_validation.h
b/src/paimon/core/schema/schema_validation.h
new file mode 100644
index 0000000..af73ee6
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class DataType;
+class Field;
+} // namespace arrow
+
+namespace paimon {
+class CoreOptions;
+class DataField;
+class TableSchema;
+
+/// Validation utils for `TableSchema`.
+class SchemaValidation {
+ public:
+ SchemaValidation() = delete;
+ ~SchemaValidation() = delete;
+
+ static Status ValidateTableSchema(const TableSchema& schema);
+
+ static bool IsPostponeBucketTable(const TableSchema& schema, int32_t
bucket);
+
+ private:
+ static Status ValidateNoDuplicateField(const std::vector<std::string>&
field_names,
+ const std::string&
error_message_intro);
+ static Status ValidateOnlyContainPrimitiveType(const
std::vector<DataField>& fields,
+ const
std::vector<std::string>& field_names,
+ const std::string&
error_message_intro);
+ static Status ValidateNotContainComplexType(const std::vector<DataField>&
fields,
+ const
std::vector<std::string>& field_names);
+ static Status ValidateBucket(const TableSchema& schema, const CoreOptions&
options);
+ static Status ValidateDefaultValues(const TableSchema& schema) {
+ return Status::NotImplemented("validate default values not
implemented");
+ }
+ static Status ValidateStartupMode(const CoreOptions& options) {
+ return Status::NotImplemented("validate startup mode not implemented");
+ }
+ static Status ValidateFieldsPrefix(const TableSchema& schema, const
CoreOptions& options);
+ static Status ValidateSequenceField(const TableSchema& schema, const
CoreOptions& options);
+ static Status ValidateSequenceGroup(const TableSchema& schema, const
CoreOptions& options);
+ static Status ValidateChangelogProducer(const CoreOptions& options);
+ static Status ValidateForDeletionVectors(const CoreOptions& options);
+
+ static Status ValidateRowTracking(const TableSchema& table_schema, const
CoreOptions& options);
+
+ static Status ValidateBlobFields(const TableSchema& schema, const
CoreOptions& options);
+
+ static bool IsComplexType(const std::shared_ptr<arrow::Field>& field);
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/schema_validation_test.cpp
b/src/paimon/core/schema/schema_validation_test.cpp
new file mode 100644
index 0000000..0a08efe
--- /dev/null
+++ b/src/paimon/core/schema/schema_validation_test.cpp
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_validation.h"
+
+#include <map>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/defs.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(SchemaValidationTest, TestSimple) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, TestRowTracking) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "-1"},
+ {Options::ROW_TRACKING_ENABLED, "true"},
+ {Options::DATA_EVOLUTION_ENABLED, "true"},
+ };
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, TestWithBlobField) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ std::shared_ptr<arrow::Field> f3 = BlobUtils::ToArrowField("f3", false);
+ std::shared_ptr<arrow::Field> f4 = BlobUtils::ToArrowField("f4", false);
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"f3,f4"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "-1"},
+ {Options::ROW_TRACKING_ENABLED, "true"},
+ {Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+ {Options::BLOB_VIEW_FIELD, "f4"},
+ {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"},
+ {Options::BLOB_EXTERNAL_STORAGE_PATH,
"FILE:///tmp/blob_external_storage/"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+
{Options::BLOB_DESCRIPTOR_FIELD, "f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field 'f0' in 'blob-descriptor-field' must be a BLOB field in
table schema.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+
{Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+
{Options::BLOB_VIEW_FIELD, "f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field 'f3' in 'blob-view-field' can not also be in
'blob-descriptor-field'.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3, f4};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "-1"},
+ {Options::ROW_TRACKING_ENABLED, "true"},
+ {Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+ {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f4"},
+ {Options::BLOB_EXTERNAL_STORAGE_PATH,
"FILE:///tmp/blob_external_storage/"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field 'f4' in 'blob-external-storage-field' must also be in
'blob-descriptor-field'.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+
{Options::BLOB_DESCRIPTOR_FIELD, "f3"},
+
{Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "'blob-external-storage-path' must be set when "
+ "'blob-external-storage-field' is configured.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "false"},
+ {Options::BLOB_FIELD,
"f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Data evolution config must be enabled for table with BLOB type
column.");
+ }
+ {
+ arrow::FieldVector fields = {f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Table with BLOB type column must have other
normal columns.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"non-exist"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Get field non-exist failed: not exist in table
schema");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"f3,f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field 'f0' in 'blob-field' must be a BLOB field
in table schema.");
+ }
+ {
+ arrow::FieldVector fields = {f0, f1, f2, f3};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f3"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+ {Options::BLOB_FIELD,
"f3"}};
+ ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options));
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateRowTracking(*table_schema,
core_options),
+ "Blob field f3 cannot be a partition key.");
+ }
+}
+
+TEST(SchemaValidationTest, TestDuplicateField) {
+ auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ {
+ // duplicate primary keys
+ std::vector<std::string> dup_primary_keys = {"f0", "f1", "f1"};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
partition_keys,
+ dup_primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "primary key [f0, f1, f1] must not contain duplicate fields.
Found: [f1]");
+ }
+ {
+ // duplicate partition keys
+ std::vector<std::string> dup_partition_keys = {"f1", "f1"};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
dup_partition_keys,
+ primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "partition key [f1, f1] must not contain duplicate fields. Found:
[f1]");
+ }
+ {
+ // duplicate bucket keys
+ std::map<std::string, std::string> dup_options = {{Options::BUCKET,
"2"},
+
{Options::BUCKET_KEY, "f0,f0"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
partition_keys,
+ primary_keys, dup_options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "bucket key [f0, f0] must not contain duplicate
fields. Found: [f0]");
+ }
+}
+
+TEST(SchemaValidationTest, TestNonExistField) {
+ auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ {
+ // non-exist primary keys
+ std::vector<std::string> non_exist_primary_keys = {"f0", "f1",
"non-exist"};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
partition_keys,
+ non_exist_primary_keys,
options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ R"(Table column ["f0", "f1", "f2"] should include all primary key
constraint ["f0", "f1", "non-exist"])");
+ }
+ {
+ // non-exist partition keys
+ std::vector<std::string> non_exist_partition_keys = {"f1",
"non-exist"};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
non_exist_partition_keys,
+ primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ R"(Table column ["f0", "f1", "f2"] should include all partition
fields ["f1", "non-exist"])");
+ }
+}
+
+TEST(SchemaValidationTest, NonPrimitivePrimaryKeyList) {
+ auto value_field = arrow::field("values", arrow::int32());
+ auto f0 = arrow::field("f0", arrow::list(value_field));
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "field f0 is unsupported");
+}
+
+TEST(SchemaValidationTest, NonPrimitivePrimaryKeyMap) {
+ auto f0 = arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32()));
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "field f0 is unsupported");
+}
+
+TEST(SchemaValidationTest, NonPrimitivePartitionKeyStruct) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto child1 = arrow::field("inner1", arrow::int32());
+ auto child2 = arrow::field("inner2", arrow::float64());
+ auto f1 = arrow::field("f1", arrow::struct_({child1, child2}));
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestComplexPartitionKey) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::decimal128(5, 2));
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, {}));
+ ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "partition field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestComplexPartitionKeyWithBlob) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = BlobUtils::ToArrowField("f1");
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"f1"};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
/*primary_keys=*/{}, {}));
+ ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "partition field f1 is unsupported");
+}
+
+TEST(SchemaValidationTest, TestDateTypePartitionKey) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::date32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, {}));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+}
+
+TEST(SchemaValidationTest, ValidateFieldsPrefix) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ {
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"},
{"fields.f0,f1,f3", "some_value"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "f3 can not be found in table schema.");
+ }
+ {
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"},
{"fields.f0,f1,f2", "some_value"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"},
+ {Options::FIELDS_DEFAULT_AGG_FUNC, "some_value"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"}, {Options::BUCKET_KEY, "f0"}, {"fields.",
"f1"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "invalid options key fields.");
+ }
+}
+
+TEST(SchemaValidationTest, ValidateBucket) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+ {Options::BUCKET_KEY,
"f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "please specify a bucket number.");
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "0"},
+ {Options::BUCKET_KEY,
"f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "The number of buckets needs to be greater than
0.");
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f2"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "You should use dynamic bucket (bucket = -1) mode in cross
partition update case");
+ }
+ {
+ std::vector<std::string> primary_keys = {};
+ std::vector<std::string> partition_keys = {"f2"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "You should define a 'bucket-key' for bucketed
append mode");
+ }
+ {
+ std::vector<std::string> partition_keys = {"f2"};
+ std::map<std::string, std::string> options =
{{"full-compaction.delta-commits", "2"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
partition_keys,
+ /*primary_keys=*/{},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "AppendOnlyTable of unware or dynamic bucket does
not support "
+ "'full-compaction.delta-commits");
+ }
+ {
+ auto f3 = arrow::field("f3", arrow::map(arrow::utf8(),
arrow::int32()));
+ arrow::FieldVector new_fields = {f0, f1, f2, f3};
+ auto new_schema = arrow::schema(new_fields);
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f3"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, new_schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Nested type cannot be in bucket-key, in your table these keys
are: f3");
+ }
+}
+
+TEST(SchemaValidationTest, ValidateDeletionVector) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ {
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"},
+ {Options::DELETION_VECTORS_ENABLED, "true"},
+ {Options::CHANGELOG_PRODUCER, "full-compaction"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "C++ Paimon does not support changelog-producer
yet");
+ }
+ {
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{Options::DELETION_VECTORS_ENABLED, "true"},
+ {Options::MERGE_ENGINE,
"first-row"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "First row merge engine does not need deletion
vectors because there "
+ "is no deletion of old data in this merge
engine.");
+ }
+}
+
+TEST(SchemaValidationTest, ValidateSequenceField) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ {
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{Options::SEQUENCE_FIELD, "f0,f1,f2"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{Options::SEQUENCE_FIELD, "f0,f1,f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "cannot be found in table schema.");
+ }
+ {
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{Options::SEQUENCE_FIELD, "f0,f1,f2"},
+ {Options::MERGE_ENGINE,
"first-row"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Do not support using sequence field on FIRST_ROW
merge engine.");
+ }
+ {
+ std::map<std::string, std::string> options = {{Options::BUCKET, "-1"},
+
{Options::SEQUENCE_FIELD, "f0,f1,f2"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{"f2"},
+ /*primary_keys=*/{"f0",
"f1"}, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "You cannot use sequence.field in cross partition
update case (Primary "
+ "key constraint 'f0, f1' not including all
partition fields 'f2').");
+ }
+}
+
+TEST(SchemaValidationTest, ValidateSequenceGroup) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{"fields.f0,f1.sequence-group", "f2"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{"fields.f0,f3.sequence-group", "f2"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field f3 can not be found in table schema.");
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{"fields.f0,f1.sequence-group", "f3"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Field f3 can not be found in table schema.");
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {{Options::BUCKET, "2"},
+ {Options::BUCKET_KEY,
"f0"},
+
{"fields.f0,f1.sequence-group", "f0,f1"},
+
{"fields.f2.sequence-group", "f0,f1"}};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "defined repeatedly by multiple groups");
+ }
+ {
+ std::vector<std::string> primary_keys = {"f0", "f1"};
+ std::vector<std::string> partition_keys = {"f1"};
+ std::map<std::string, std::string> options = {
+ {Options::BUCKET, "2"},
+ {Options::BUCKET_KEY, "f0"},
+ {"fields.f0,f1.sequence-group", "f2"},
+ {"fields.f0.aggregate-function", "min"},
+ };
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Should not define aggregation function on
sequence group");
+ }
+}
+
+TEST(SchemaValidationTest, ValidateInvalidConfiguration) {
+ auto f0 = arrow::field("f0", arrow::utf8());
+ auto f1 = arrow::field("f1", arrow::int32());
+ auto f2 = arrow::field("f2", arrow::float64());
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto schema = arrow::schema(fields);
+ {
+ std::map<std::string, std::string> options =
{{Options::CHANGELOG_PRODUCER, "input"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Can not set changelog-producer on table without
primary keys, please "
+ "define primary keys.");
+ }
+ {
+ auto invalid_field = arrow::field("_SEQUENCE_NUMBER", arrow::int64());
+ arrow::FieldVector invalid_fields = fields;
+ invalid_fields.push_back(invalid_field);
+ auto invalid_schema = arrow::schema(invalid_fields);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, invalid_schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{}, /*options=*/{}));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "field name '_SEQUENCE_NUMBER' in schema cannot be
special field.");
+ }
+ {
+ auto invalid_field = arrow::field("_KEY_a", arrow::int64());
+ arrow::FieldVector invalid_fields = fields;
+ invalid_fields.push_back(invalid_field);
+ auto invalid_schema = arrow::schema(invalid_fields);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, invalid_schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{}, /*options=*/{}));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "field name '_KEY_a' in schema cannot start with
'_KEY_'");
+ }
+ {
+ std::map<std::string, std::string> options =
{{Options::CHANGELOG_PRODUCER, "input"},
+ {Options::MERGE_ENGINE,
"first-row"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{"f0"},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "C++ Paimon does not support changelog-producer
yet");
+ }
+ {
+ std::map<std::string, std::string> options =
{{Options::CHANGELOG_PRODUCER, "lookup"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{"f0"},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "C++ Paimon does not support changelog-producer
yet");
+ }
+ // test for row tracking
+ {
+ std::map<std::string, std::string> options =
{{Options::ROW_TRACKING_ENABLED, "true"},
+ {Options::BUCKET, "1"},
+ {Options::BUCKET_KEY,
"f0"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
options));
+ ASSERT_NOK_WITH_MSG(
+ SchemaValidation::ValidateTableSchema(*table_schema),
+ "Cannot define bucket for row tracking table, it only support
bucket = -1");
+ }
+ {
+ std::map<std::string, std::string> options =
{{Options::ROW_TRACKING_ENABLED, "true"},
+ {Options::BUCKET, "-1"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{"f0"},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Cannot define primary key for row tracking
table");
+ }
+ {
+ std::map<std::string, std::string> options =
{{Options::DATA_EVOLUTION_ENABLED, "true"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Data evolution config must enabled with
row-tracking.enabled");
+ }
+ {
+ std::map<std::string, std::string> options =
{{Options::ROW_TRACKING_ENABLED, "true"},
+
{Options::DATA_EVOLUTION_ENABLED, "true"},
+
{Options::DELETION_VECTORS_ENABLED, "true"}};
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
options));
+
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
+ "Data evolution config must disabled with
deletion-vectors.enabled");
+ }
+}
+} // namespace paimon::test