This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ba36d49 feat(schema): add table schema and schema manager (#54)
ba36d49 is described below
commit ba36d497bf41b4b1dd0408dd74bd772cf3f96a33
Author: Yonghao Fang <[email protected]>
AuthorDate: Mon Jun 8 15:02:30 2026 +0800
feat(schema): add table schema and schema manager (#54)
---
include/paimon/schema/schema.h | 110 ++
src/paimon/core/schema/schema_manager.cpp | 124 +++
src/paimon/core/schema/schema_manager.h | 75 ++
src/paimon/core/schema/schema_manager_test.cpp | 203 ++++
src/paimon/core/schema/table_schema.cpp | 364 +++++++
src/paimon/core/schema/table_schema.h | 155 +++
src/paimon/core/schema/table_schema_test.cpp | 1301 ++++++++++++++++++++++++
7 files changed, 2332 insertions(+)
diff --git a/include/paimon/schema/schema.h b/include/paimon/schema/schema.h
new file mode 100644
index 0000000..e3f557f
--- /dev/null
+++ b/include/paimon/schema/schema.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// This interface provides access to TableSchema-related information.
+class PAIMON_EXPORT Schema {
+ public:
+ virtual ~Schema() = default;
+
+ /// Get the Arrow C schema representation of this table schema.
+ /// @return A result containing an ArrowSchema, or an error status if
conversion fails.
+ virtual Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const = 0;
+
+ /// Get the JSON schema representation of this table schema.
+ ///
+ /// This method provides a JSON string that represents the complete schema
information.
+ ///
+ /// @return A string containing the JSON schema, or an error status on
failure.
+ virtual Result<std::string> GetJsonSchema() const = 0;
+
+ /// Get the names of all fields in the table schema.
+ /// @return A vector of field names.
+ virtual std::vector<std::string> FieldNames() const = 0;
+
+ /// Get the type of specific field.
+ ///
+ /// @param field_name The name of the field to query.
+ /// @return Result containing the FieldType of the specified field, or an
error status on
+ /// failure.
+ virtual Result<FieldType> GetFieldType(const std::string& field_name)
const = 0;
+
+ /// Get an optional comment describing the schema object.
+ /// @return The comment if set, or std::nullopt otherwise.
+ virtual std::optional<std::string> Comment() const = 0;
+};
+
+/// Schema contract for data tables.
+class PAIMON_EXPORT DataSchema : public Schema {
+ public:
+ ~DataSchema() override = default;
+
+ /// Get the unique identifier of this table schema.
+ /// @return The schema id
+ virtual int64_t Id() const = 0;
+
+ /// Get the list of primary key field names.
+ /// @return A reference to the vector of primary key names; empty if no
primary keys are
+ /// defined.
+ virtual const std::vector<std::string>& PrimaryKeys() const = 0;
+
+ /// Get the list of partition key field names.
+ /// @return A reference to the vector of partition key names; empty if the
table is not
+ /// partitioned.
+ virtual const std::vector<std::string>& PartitionKeys() const = 0;
+
+ /// Get the list of bucket key field names used for bucketing.
+ /// @return A reference to the vector of bucket key names.
+ virtual const std::vector<std::string>& BucketKeys() const = 0;
+
+ /// Get the number of buckets configured for this table.
+ /// @return The number of buckets.
+ virtual int32_t NumBuckets() const = 0;
+
+ /// Get the highest field id assigned in this schema.
+ /// @return The maximum field id.
+ virtual int32_t HighestFieldId() const = 0;
+
+ /// Get the table-level options associated with this schema.
+ /// @return A reference to the map of option key-value pairs (e.g., file
format, filesystem).
+ virtual const std::map<std::string, std::string>& Options() const = 0;
+};
+
+/// Schema contract for system tables.
+class PAIMON_EXPORT SystemSchema : public Schema {
+ public:
+ ~SystemSchema() override = default;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager.cpp
b/src/paimon/core/schema/schema_manager.cpp
new file mode 100644
index 0000000..2425cc4
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_manager.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/schema/schema_validation.h"
+#include "paimon/core/utils/branch_manager.h"
+#include "paimon/core/utils/file_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+
+SchemaManager::SchemaManager(const std::shared_ptr<FileSystem>& file_system,
+ const std::string& table_root)
+ : SchemaManager(file_system, table_root,
BranchManager::DEFAULT_MAIN_BRANCH) {}
+
+SchemaManager::SchemaManager(const std::shared_ptr<FileSystem>& file_system,
+ const std::string& table_root, const std::string&
branch)
+ : file_system_(file_system),
+ table_root_(table_root),
+ branch_(BranchManager::NormalizeBranch(branch)) {}
+
+std::string SchemaManager::BranchPath() const {
+ return BranchManager::BranchPath(table_root_, branch_);
+}
+
+std::string SchemaManager::ToSchemaPath(int64_t schema_id) const {
+ return PathUtil::JoinPath(BranchPath(),
+ "/schema/" + std::string(SCHEMA_PREFIX) +
std::to_string(schema_id));
+}
+Result<std::optional<std::shared_ptr<TableSchema>>> SchemaManager::Latest()
const {
+ std::vector<int64_t> versions;
+ PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(file_system_,
SchemaDirectory(),
+
std::string(SCHEMA_PREFIX), &versions));
+ if (versions.empty()) {
+ return std::optional<std::shared_ptr<TableSchema>>();
+ }
+ int64_t max_schema_id = versions[0];
+ for (const auto& version : versions) {
+ max_schema_id = std::max(max_schema_id, version);
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<TableSchema> schema,
ReadSchema(max_schema_id));
+ return std::optional<std::shared_ptr<TableSchema>>(schema);
+}
+
+Result<std::shared_ptr<TableSchema>> SchemaManager::ReadSchema(int64_t
schema_id) const {
+ auto iter = schema_cache_.find(schema_id);
+ if (iter != schema_cache_.end()) {
+ return iter->second;
+ }
+ auto path = ToSchemaPath(schema_id);
+ std::string content;
+ PAIMON_RETURN_NOT_OK(file_system_->ReadFile(path, &content));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<TableSchema> schema,
+ TableSchema::CreateFromJson(content));
+ schema_cache_[schema_id] = schema;
+ return schema;
+}
+
+std::string SchemaManager::SchemaDirectory() const {
+ return PathUtil::JoinPath(BranchPath(), "/schema");
+}
+
+Result<bool> SchemaManager::SchemaExists(int64_t id) const {
+ std::string schema_path = ToSchemaPath(id);
+ return file_system_->Exists(schema_path);
+}
+
+Result<std::vector<int64_t>> SchemaManager::ListAllIds() const {
+ std::vector<int64_t> versions;
+ PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(file_system_,
SchemaDirectory(),
+
std::string(SCHEMA_PREFIX), &versions));
+ return versions;
+}
+
+Result<std::unique_ptr<TableSchema>> SchemaManager::CreateTable(
+ const std::shared_ptr<arrow::Schema>& schema, const
std::vector<std::string>& partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options) {
+ while (true) {
+ PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>>
latest_schema, Latest());
+ if (latest_schema) {
+ return Status::Invalid("Schema in filesystem exists, creation is
not allowed.");
+ }
+ PAIMON_ASSIGN_OR_RAISE(
+ std::unique_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+
PAIMON_RETURN_NOT_OK(SchemaValidation::ValidateTableSchema(*table_schema));
+ std::string schema_path = ToSchemaPath(0);
+ PAIMON_ASSIGN_OR_RAISE(std::string content,
table_schema->ToJsonString());
+ auto status = file_system_->AtomicStore(schema_path, content);
+ if (status.ok()) {
+ return table_schema;
+ }
+ }
+ return Status::Invalid("create table failed, should not be here");
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager.h
b/src/paimon/core/schema/schema_manager.h
new file mode 100644
index 0000000..382fa30
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+class FileSystem;
+
+/// Schema Manager to manage schema versions.
+class SchemaManager {
+ public:
+ SchemaManager(const std::shared_ptr<FileSystem>& file_system, const
std::string& table_root);
+ /// Specify the default branch for data writing.
+ SchemaManager(const std::shared_ptr<FileSystem>& file_system, const
std::string& table_root,
+ const std::string& branch);
+
+ /// Read schema for schema id. Find schema in cache first.
+ Result<std::shared_ptr<TableSchema>> ReadSchema(int64_t schema_id) const;
+ Result<std::optional<std::shared_ptr<TableSchema>>> Latest() const;
+ Result<std::unique_ptr<TableSchema>> CreateTable(
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::vector<std::string>& partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options);
+
+ std::string SchemaDirectory() const;
+ Result<bool> SchemaExists(int64_t id) const;
+ Result<std::vector<int64_t>> ListAllIds() const;
+
+ private:
+ std::string BranchPath() const;
+ std::string ToSchemaPath(int64_t schema_id) const;
+
+ private:
+ static constexpr char SCHEMA_PREFIX[] = "schema-";
+
+ private:
+ std::shared_ptr<FileSystem> file_system_;
+ std::string table_root_;
+ const std::string branch_;
+ mutable std::map<int64_t, std::shared_ptr<TableSchema>> schema_cache_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/schema_manager_test.cpp
b/src/paimon/core/schema/schema_manager_test.cpp
new file mode 100644
index 0000000..83f995a
--- /dev/null
+++ b/src/paimon/core/schema/schema_manager_test.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/schema_manager.h"
+
+#include <set>
+#include <utility>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(SchemaManagerTest, TestSimple) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root =
+ paimon::test::GetDataDir() +
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/";
+ SchemaManager manager(fs, table_root);
+ ASSERT_EQ(manager.ToSchemaPath(/*schema_id=*/0),
+ paimon::test::GetDataDir() +
+
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/schema/schema-0");
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> ret,
manager.ReadSchema(/*schema_id=*/1));
+ std::string schema_json = R"({
+ "version" : 3,
+ "id" : 1,
+ "fields" : [ {
+ "id" : 1,
+ "name" : "key1",
+ "type" : "INT NOT NULL"
+ }, {
+ "id" : 7,
+ "name" : "k",
+ "type" : "STRING"
+ }, {
+ "id" : 2,
+ "name" : "key_2",
+ "type" : "INT NOT NULL"
+ }, {
+ "id" : 4,
+ "name" : "c",
+ "type" : "INT"
+ }, {
+ "id" : 8,
+ "name" : "d",
+ "type" : "INT",
+ "description" : ""
+ }, {
+ "id" : 6,
+ "name" : "a",
+ "type" : "INT"
+ }, {
+ "id" : 0,
+ "name" : "key0",
+ "type" : "INT NOT NULL"
+ }, {
+ "id" : 9,
+ "name" : "e",
+ "type" : "INT"
+ } ],
+ "highestFieldId" : 9,
+ "partitionKeys" : [ "key0", "key1" ],
+ "primaryKeys" : [ "key0", "key1", "key_2" ],
+ "options" : {
+ "bucket" : "1",
+ "manifest.format" : "orc",
+ "file.format" : "orc",
+ "deletion-vectors.enabled" : "true",
+ "commit.force-compact" : "true"
+ },
+ "timeMillis" : 1730516111087
+ })";
+ ASSERT_OK_AND_ASSIGN(auto expected_schema,
TableSchema::CreateFromJson(schema_json));
+ ASSERT_EQ(*ret, *expected_schema);
+ ASSERT_FALSE(manager.schema_cache_.empty());
+ ASSERT_EQ(*manager.ReadSchema(/*schema_id=*/1).value(), *expected_schema);
+ ASSERT_EQ(*(manager.Latest().value().value()), *expected_schema);
+}
+
+TEST(SchemaManagerTest, TestNonExistTable) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root = paimon::test::GetDataDir() +
"/non-exist.db/non-exist/";
+ SchemaManager manager(fs, table_root);
+ ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> latest,
manager.Latest());
+ ASSERT_EQ(latest, std::nullopt);
+ auto ret = manager.ReadSchema(/*schema_id=*/100);
+ ASSERT_FALSE(ret.ok());
+}
+
+TEST(SchemaManagerTest, TestSchemaDirectory) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root = paimon::test::GetDataDir() + "/sample_table/";
+ SchemaManager manager(fs, table_root);
+ ASSERT_EQ(manager.SchemaDirectory(), paimon::test::GetDataDir() +
"/sample_table/schema");
+}
+
+TEST(SchemaManagerTest, TestSchemaDirectoryWithBranch) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root = paimon::test::GetDataDir() + "/sample_table/";
+ {
+ SchemaManager manager(fs, table_root, /*branch=*/"data");
+ ASSERT_EQ(manager.SchemaDirectory(),
+ paimon::test::GetDataDir() +
"/sample_table/branch/branch-data/schema");
+ }
+ {
+ SchemaManager manager(fs, table_root, /*branch=*/"main");
+ ASSERT_EQ(manager.SchemaDirectory(), paimon::test::GetDataDir() +
"/sample_table/schema");
+ }
+}
+
+TEST(SchemaManagerTest, TestCreateTableWithInvalidInput) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ auto dir = UniqueTestDirectory::Create();
+ SchemaManager manager(fs, dir->Str());
+
+ // Create an Arrow schema
+ auto field1 = std::make_shared<arrow::Field>("id", arrow::int32(), false);
+ auto field2 = std::make_shared<arrow::Field>("name", arrow::utf8());
+ auto field3 = std::make_shared<arrow::Field>("value", arrow::int64());
+ auto schema = arrow::schema(arrow::FieldVector{field1, field2, field3});
+
+ std::vector<std::string> partition_keys = {"id"};
+ std::vector<std::string> primary_keys = {"id"};
+ std::map<std::string, std::string> options = {{"file.format", "orc"},
+ {"commit.force-compact",
"true"}};
+
+ // Create table
+ auto result = manager.CreateTable(schema, partition_keys, primary_keys,
options);
+ ASSERT_NOK(result);
+}
+
+TEST(SchemaManagerTest, TestCreateTable) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ auto dir = UniqueTestDirectory::Create();
+ SchemaManager manager(fs, dir->Str());
+
+ // Create an Arrow schema
+ auto field1 = std::make_shared<arrow::Field>("id", arrow::int32(), false);
+ auto field2 = std::make_shared<arrow::Field>("name", arrow::utf8());
+ auto field3 = std::make_shared<arrow::Field>("value", arrow::int64());
+ auto schema = arrow::schema(arrow::FieldVector{field1, field2, field3});
+
+ std::vector<std::string> partition_keys = {"name"};
+ std::vector<std::string> primary_keys = {"id"};
+ std::map<std::string, std::string> options = {{"file.format", "orc"},
+ {"commit.force-compact",
"true"}};
+
+ // Create table
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::unique_ptr<TableSchema> result,
+ manager.CreateTable(schema, partition_keys,
primary_keys, options));
+
+ // Verify schema was created
+ ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>>
latest_result,
+ manager.Latest());
+ ASSERT_TRUE(latest_result);
+ auto created_schema = latest_result.value();
+ ASSERT_EQ(created_schema->Id(), 0);
+ ASSERT_EQ(created_schema->PartitionKeys(), partition_keys);
+ ASSERT_EQ(created_schema->PrimaryKeys(), primary_keys);
+}
+
+TEST(SchemaManagerTest, TestCreateTableAlreadyExists) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root =
+ paimon::test::GetDataDir() +
"/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/";
+ SchemaManager manager(fs, table_root);
+
+ // Create an Arrow schema
+ auto field = std::make_shared<arrow::Field>("dummy", arrow::int32());
+ auto schema = arrow::schema(arrow::FieldVector{field});
+
+ // Try to create table where schema already exists
+ ASSERT_NOK_WITH_MSG(manager.CreateTable(schema, {}, {}, {}), "Schema in
filesystem exists");
+}
+
+TEST(SchemaManagerTest, TestListAllIds) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string table_root =
+ paimon::test::GetDataDir() +
"/orc/pk_table_with_mor.db/pk_table_with_mor/";
+ SchemaManager manager(fs, table_root);
+ ASSERT_OK_AND_ASSIGN(auto ids, manager.ListAllIds());
+ ASSERT_EQ(std::set<int64_t>(ids.begin(), ids.end()), std::set<int64_t>({0,
1, 2, 3, 4}));
+}
+} // namespace paimon::test
diff --git a/src/paimon/core/schema/table_schema.cpp
b/src/paimon/core/schema/table_schema.cpp
new file mode 100644
index 0000000..a029461
--- /dev/null
+++ b/src/paimon/core/schema/table_schema.cpp
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/table_schema.h"
+
+#include <algorithm>
+#include <iterator>
+#include <set>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/defs.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<TableSchema>> TableSchema::Create(
+ int64_t schema_id, const std::shared_ptr<arrow::Schema>& schema,
+ const std::vector<std::string>& partition_keys, const
std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options) {
+ if (schema_id != 0) {
+ return Status::NotImplemented("do not support schema evolution,
schema_id must be 0");
+ }
+ std::vector<DataField> data_fields;
+ int32_t field_id = 0;
+ std::set<std::string> primary_key_set;
+ for (const auto& primary_key : primary_keys) {
+ primary_key_set.insert(primary_key);
+ }
+ for (const auto& field : schema->fields()) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> field_with_id,
+ AssignFieldIdsRecursively(field,
/*set_field_id=*/true, &field_id));
+ if (primary_key_set.count(field_with_id->name())) {
+ field_with_id = field_with_id->WithNullable(false);
+ }
+ PAIMON_ASSIGN_OR_RAISE(DataField data_field,
+
DataField::ConvertArrowFieldToDataField(field_with_id));
+ data_fields.push_back(data_field);
+ }
+ return InitSchema(schema_id, data_fields, field_id - 1, partition_keys,
primary_keys, options,
+ /*comment=*/std::nullopt,
DateTimeUtils::GetCurrentUTCTimeUs() / 1000);
+}
+
+Result<std::shared_ptr<arrow::KeyValueMetadata>>
TableSchema::MakeMetaDataWithFieldId(
+ const std::shared_ptr<arrow::Field>& field, int32_t field_id) {
+ std::vector<std::string> keys = {std::string(DataField::FIELD_ID)};
+ std::vector<std::string> values = {std::to_string(field_id)};
+ std::shared_ptr<arrow::KeyValueMetadata> metadata =
arrow::KeyValueMetadata::Make(keys, values);
+ if (field->HasMetadata() && field->metadata()) {
+ if (field->metadata()->Contains(DataField::FIELD_ID)) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::string field_id_result,
+
field->metadata()->Get(DataField::FIELD_ID));
+ if (std::to_string(field_id) != field_id_result) {
+ return Status::Invalid(fmt::format("field id {} not match with
{} {} in metadata",
+ field_id,
DataField::FIELD_ID, field_id_result));
+ }
+ }
+ metadata = metadata->Merge(*field->metadata());
+ }
+ return metadata;
+}
+
+Result<std::shared_ptr<arrow::Field>> TableSchema::AssignFieldIdsRecursively(
+ const std::shared_ptr<arrow::Field>& field, bool assign_id_to_self,
int32_t* field_id) {
+ std::shared_ptr<arrow::KeyValueMetadata> metadata;
+ if (assign_id_to_self) {
+ PAIMON_ASSIGN_OR_RAISE(metadata, MakeMetaDataWithFieldId(field,
*field_id));
+ (*field_id)++;
+ }
+ auto type = field->type();
+ if (type->id() == arrow::Type::STRUCT) {
+ auto struct_type =
arrow::internal::checked_pointer_cast<arrow::StructType>(field->type());
+ arrow::FieldVector new_childs;
+ for (const auto& child : struct_type->fields()) {
+ PAIMON_ASSIGN_OR_RAISE(
+ auto new_child, AssignFieldIdsRecursively(child,
/*set_field_id=*/true, field_id));
+ new_childs.push_back(new_child);
+ }
+ return arrow::field(field->name(), arrow::struct_(new_childs),
field->nullable(), metadata);
+ } else if (type->id() == arrow::Type::LIST) {
+ auto list_type =
arrow::internal::checked_pointer_cast<arrow::ListType>(field->type());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Field> new_value_field,
+
AssignFieldIdsRecursively(list_type->value_field(),
+
/*set_field_id=*/false, field_id));
+ return arrow::field(field->name(), arrow::list(new_value_field),
field->nullable(),
+ metadata);
+ } else if (field->type()->id() == arrow::Type::MAP) {
+ auto map_type =
arrow::internal::checked_pointer_cast<arrow::MapType>(field->type());
+ std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+ std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+ PAIMON_ASSIGN_OR_RAISE(
+ key_field, AssignFieldIdsRecursively(key_field,
/*set_field_id=*/false, field_id));
+ PAIMON_ASSIGN_OR_RAISE(
+ value_field, AssignFieldIdsRecursively(value_field,
/*set_field_id=*/false, field_id));
+ return arrow::field(field->name(), arrow::map(key_field->type(),
value_field),
+ field->nullable(), metadata);
+ }
+ return metadata ? field->WithMergedMetadata(metadata) : field;
+}
+
+rapidjson::Value TableSchema::ToJson(rapidjson::Document::AllocatorType*
allocator) const
+ noexcept(false) {
+ rapidjson::Value obj(rapidjson::kObjectType);
+ obj.AddMember(rapidjson::StringRef("version"),
+ RapidJsonUtil::SerializeValue(version_, allocator).Move(),
*allocator);
+ obj.AddMember(rapidjson::StringRef("id"),
RapidJsonUtil::SerializeValue(id_, allocator).Move(),
+ *allocator);
+ obj.AddMember(rapidjson::StringRef("fields"),
+ RapidJsonUtil::SerializeValue(fields_, allocator).Move(),
*allocator);
+ obj.AddMember(rapidjson::StringRef("highestFieldId"),
+ RapidJsonUtil::SerializeValue(highest_field_id_,
allocator).Move(), *allocator);
+ obj.AddMember(rapidjson::StringRef("partitionKeys"),
+ RapidJsonUtil::SerializeValue(partition_keys_,
allocator).Move(), *allocator);
+ obj.AddMember(rapidjson::StringRef("primaryKeys"),
+ RapidJsonUtil::SerializeValue(primary_keys_,
allocator).Move(), *allocator);
+ obj.AddMember(rapidjson::StringRef("options"),
+ RapidJsonUtil::SerializeValue(options_, allocator).Move(),
*allocator);
+ if (comment_) {
+ obj.AddMember(rapidjson::StringRef("comment"),
+ RapidJsonUtil::SerializeValue(comment_,
allocator).Move(), *allocator);
+ }
+ obj.AddMember(rapidjson::StringRef("timeMillis"),
+ RapidJsonUtil::SerializeValue(time_millis_,
allocator).Move(), *allocator);
+ return obj;
+}
+
+TableSchema::TableSchema(int32_t version, int64_t id, const
std::vector<DataField>& fields,
+ int32_t highest_field_id, const
std::vector<std::string>& partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options,
+ const std::optional<std::string>& comment, int64_t
time_millis)
+ : version_(version),
+ id_(id),
+ fields_(fields),
+ highest_field_id_(highest_field_id),
+ partition_keys_(partition_keys),
+ primary_keys_(primary_keys),
+ options_(options),
+ comment_(comment),
+ time_millis_(time_millis) {}
+
+bool TableSchema::operator==(const TableSchema& other) const {
+ return version_ == other.version_ && fields_ == other.fields_ &&
+ partition_keys_ == other.partition_keys_ && primary_keys_ ==
other.primary_keys_ &&
+ options_ == other.options_ && comment_ == other.comment_ &&
+ time_millis_ == other.time_millis_;
+}
+
+Result<std::unique_ptr<::ArrowSchema>> TableSchema::GetArrowSchema() const {
+ auto schema = DataField::ConvertDataFieldsToArrowSchema(fields_);
+ auto c_schema = std::make_unique<::ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema,
c_schema.get()));
+ return c_schema;
+}
+
+std::vector<std::string> TableSchema::FieldNames() const {
+ std::vector<std::string> field_names;
+ field_names.reserve(fields_.size());
+ std::transform(fields_.begin(), fields_.end(),
std::back_inserter(field_names),
+ [](const DataField& field) { return field.Name(); });
+ return field_names;
+}
+
+Result<FieldType> TableSchema::GetFieldType(const std::string& field_name)
const {
+ PAIMON_ASSIGN_OR_RAISE(DataField field, GetField(field_name));
+ return FieldTypeUtils::ConvertToFieldType(field.Type()->id());
+}
+
+Result<DataField> TableSchema::GetField(const std::string& field_name) const {
+ for (const auto& field : Fields()) {
+ if (field.Name() == field_name) {
+ return field;
+ }
+ }
+ return Status::Invalid(
+ fmt::format("Get field {} failed: not exist in table schema",
field_name));
+}
+
+Result<DataField> TableSchema::GetField(int32_t field_id) const {
+ for (const auto& field : Fields()) {
+ if (field.Id() == field_id) {
+ return field;
+ }
+ }
+ return Status::Invalid(
+ fmt::format("Get field with id {} failed: not exist in table schema",
field_id));
+}
+
+Result<std::vector<DataField>> TableSchema::GetFields(
+ const std::vector<std::string>& field_names) const {
+ std::vector<DataField> data_fields;
+ data_fields.reserve(field_names.size());
+ for (const auto& name : field_names) {
+ PAIMON_ASSIGN_OR_RAISE(DataField field, GetField(name));
+ data_fields.emplace_back(field);
+ }
+ return data_fields;
+}
+
+Result<std::unique_ptr<TableSchema>> TableSchema::CreateFromJson(const
std::string& json_str) {
+ PAIMON_ASSIGN_OR_RAISE(TableSchema table_schema,
TableSchema::FromJsonString(json_str));
+ return InitSchema(table_schema.id_, table_schema.fields_,
table_schema.highest_field_id_,
+ table_schema.partition_keys_, table_schema.primary_keys_,
+ table_schema.options_, table_schema.comment_,
table_schema.time_millis_);
+}
+
+Result<std::unique_ptr<TableSchema>> TableSchema::InitSchema(
+ int64_t schema_id, const std::vector<DataField>& fields, int32_t
highest_field_id,
+ const std::vector<std::string>& partition_keys, const
std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options, const
std::optional<std::string>& comment,
+ int64_t time_millis) {
+ // validate schema first
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields);
+
PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema));
+
+ auto table_schema = std::unique_ptr<TableSchema>(
+ new TableSchema(TableSchema::CURRENT_VERSION, schema_id, fields,
highest_field_id,
+ partition_keys, primary_keys, options, comment,
time_millis));
+ PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> keys,
table_schema->TrimmedPrimaryKeys());
+
+ // Try to validate bucket keys
+ PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> bucket_keys,
+ table_schema->OriginalBucketKeys());
+ if (bucket_keys.empty()) {
+ bucket_keys = keys;
+ }
+ table_schema->bucket_keys_ = bucket_keys;
+ PAIMON_ASSIGN_OR_RAISE(
+ table_schema->num_bucket_,
+ OptionsUtils::GetValueFromMap<int32_t>(table_schema->Options(),
Options::BUCKET, -1));
+
+ return table_schema;
+}
+
+void TableSchema::FromJson(const rapidjson::Value& obj) noexcept(false) {
+ version_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, "version",
PAIMON_07_VERSION);
+ id_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, "id");
+ fields_ = RapidJsonUtil::DeserializeKeyValue<std::vector<DataField>>(obj,
"fields");
+ highest_field_id_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj,
"highestFieldId");
+ partition_keys_ =
+ RapidJsonUtil::DeserializeKeyValue<std::vector<std::string>>(obj,
"partitionKeys");
+ primary_keys_ =
+ RapidJsonUtil::DeserializeKeyValue<std::vector<std::string>>(obj,
"primaryKeys");
+ options_ =
+ RapidJsonUtil::DeserializeKeyValue<std::map<std::string,
std::string>>(obj, "options");
+ if (version_ <= PAIMON_07_VERSION && options_.find(Options::BUCKET) ==
options_.end()) {
+ // the default value of BUCKET in old version is 1
+ options_[Options::BUCKET] = "1";
+ }
+ if (version_ <= PAIMON_08_VERSION && options_.find(Options::FILE_FORMAT)
== options_.end()) {
+ // the default value of FILE_FORMAT in old version is orc
+ options_[Options::FILE_FORMAT] = "orc";
+ }
+ comment_ =
+ RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(obj,
"comment", comment_);
+ time_millis_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj,
"timeMillis", 0);
+}
+
+Result<std::vector<std::string>> TableSchema::TrimmedPrimaryKeys() const {
+ if (primary_keys_.size() > 0) {
+ std::vector<std::string> result;
+ result.reserve(primary_keys_.size());
+ std::set<std::string> partition_keys_set(partition_keys_.begin(),
partition_keys_.end());
+ for (const auto& pk : primary_keys_) {
+ if (partition_keys_set.find(pk) == partition_keys_set.end()) {
+ result.emplace_back(pk);
+ }
+ }
+ if (result.size() <= 0) {
+ return Status::Invalid(
+ fmt::format("Primary key constraint {} should not be same with
partition "
+ "fields {}, this will result in only one record in
a partition",
+ primary_keys_, partition_keys_));
+ }
+ return result;
+ }
+ return primary_keys_;
+}
+
+Result<std::vector<DataField>> TableSchema::TrimmedPrimaryKeyFields() const {
+ PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_primary_key,
TrimmedPrimaryKeys());
+ return GetFields(trimmed_primary_key);
+}
+
+Result<std::shared_ptr<arrow::Schema>> TableSchema::TrimmedPrimaryKeySchema()
const {
+ PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> pk_fields,
TrimmedPrimaryKeyFields());
+ return DataField::ConvertDataFieldsToArrowSchema(pk_fields);
+}
+
+/// Original bucket keys, maybe empty.
+Result<std::vector<std::string>> TableSchema::OriginalBucketKeys() const {
+ std::vector<std::string> bucket_keys;
+ auto iter = options_.find(Options::BUCKET_KEY);
+ if (iter == options_.end()) {
+ return bucket_keys;
+ }
+ const auto& key = iter->second;
+ if (StringUtils::IsNullOrWhitespaceOnly(key)) {
+ return bucket_keys;
+ }
+ bucket_keys = StringUtils::Split(key, ",", /*ignore_empty=*/false);
+ if (!ObjectUtils::ContainsAll(FieldNames(), bucket_keys)) {
+ return Status::Invalid(fmt::format("Field names {} should contain all
bucket keys {}.",
+ FieldNames(), bucket_keys));
+ }
+ bool any_match =
+ std::any_of(bucket_keys.begin(), bucket_keys.end(), [this](const
std::string& key) {
+ return std::find(partition_keys_.begin(), partition_keys_.end(),
key) !=
+ partition_keys_.end();
+ });
+ if (any_match) {
+ return Status::Invalid(fmt::format("Bucket keys {} should not in
partition keys {}.",
+ bucket_keys, partition_keys_));
+ }
+
+ if (primary_keys_.size() > 0) {
+ if (!ObjectUtils::ContainsAll(primary_keys_, bucket_keys)) {
+ return Status::Invalid(fmt::format("Primary keys {} should contain
all bucket keys {}.",
+ primary_keys_, bucket_keys));
+ }
+ }
+ return bucket_keys;
+}
+
+bool TableSchema::CrossPartitionUpdate() const {
+ if (primary_keys_.empty() || partition_keys_.empty()) {
+ return false;
+ }
+
+ // If any partition key is not in primary keys, return true
+ return !ObjectUtils::ContainsAll(primary_keys_, partition_keys_);
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/schema/table_schema.h
b/src/paimon/core/schema/table_schema.h
new file mode 100644
index 0000000..c878151
--- /dev/null
+++ b/src/paimon/core/schema/table_schema.h
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/result.h"
+#include "paimon/schema/schema.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+/// Schema of a table, including schemaId and fieldId.
+class TableSchema : public DataSchema, public Jsonizable<TableSchema> {
+ public:
+ static constexpr int64_t FIRST_SCHEMA_ID = 0;
+ static constexpr int32_t PAIMON_07_VERSION = 1;
+ static constexpr int32_t PAIMON_08_VERSION = 2;
+ static constexpr int32_t CURRENT_VERSION = 3;
+
+ static Result<std::unique_ptr<TableSchema>> Create(
+ int64_t schema_id, const std::shared_ptr<arrow::Schema>& schema,
+ const std::vector<std::string>& partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options);
+
+ static Result<std::unique_ptr<TableSchema>> CreateFromJson(const
std::string& json_str);
+
+ rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator)
const
+ noexcept(false) override;
+
+ void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+ bool operator==(const TableSchema& other) const;
+
+ Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const override;
+
+ Result<std::string> GetJsonSchema() const override {
+ return ToJsonString();
+ }
+
+ std::vector<std::string> FieldNames() const override;
+
+ Result<FieldType> GetFieldType(const std::string& field_name) const
override;
+
+ int64_t Id() const override {
+ return id_;
+ }
+ const std::vector<std::string>& PrimaryKeys() const override {
+ return primary_keys_;
+ }
+ const std::vector<std::string>& PartitionKeys() const override {
+ return partition_keys_;
+ }
+
+ const std::vector<std::string>& BucketKeys() const override {
+ return bucket_keys_;
+ }
+
+ int32_t NumBuckets() const override {
+ return num_bucket_;
+ }
+ int32_t HighestFieldId() const override {
+ return highest_field_id_;
+ }
+ const std::map<std::string, std::string>& Options() const override {
+ return options_;
+ }
+ const std::vector<DataField>& Fields() const {
+ return fields_;
+ }
+
+ Result<DataField> GetField(const std::string& field_name) const;
+
+ Result<DataField> GetField(int32_t field_id) const;
+
+ Result<std::vector<DataField>> GetFields(const std::vector<std::string>&
field_names) const;
+ Result<std::vector<std::string>> TrimmedPrimaryKeys() const;
+ Result<std::vector<DataField>> TrimmedPrimaryKeyFields() const;
+ Result<std::shared_ptr<arrow::Schema>> TrimmedPrimaryKeySchema() const;
+
+ std::optional<std::string> Comment() const override {
+ return comment_;
+ }
+
+ bool CrossPartitionUpdate() const;
+
+ private:
+ JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(TableSchema);
+
+ static Result<std::unique_ptr<TableSchema>> InitSchema(
+ int64_t schema_id, const std::vector<DataField>& fields, int32_t
highest_field_id,
+ const std::vector<std::string>& partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options,
+ const std::optional<std::string>& comment, int64_t time_millis);
+
+ TableSchema(int32_t version, int64_t id, const std::vector<DataField>&
fields,
+ int32_t highest_field_id, const std::vector<std::string>&
partition_keys,
+ const std::vector<std::string>& primary_keys,
+ const std::map<std::string, std::string>& options,
+ const std::optional<std::string>& comment, int64_t
time_millis);
+
+ Result<std::vector<std::string>> OriginalBucketKeys() const;
+
+ static Result<std::shared_ptr<arrow::Field>> AssignFieldIdsRecursively(
+ const std::shared_ptr<arrow::Field>& field, bool set_field_id,
int32_t* field_id);
+
+ static Result<std::shared_ptr<arrow::KeyValueMetadata>>
MakeMetaDataWithFieldId(
+ const std::shared_ptr<arrow::Field>& field, int32_t field_id);
+
+ private:
+ // version of schema for paimon
+ int32_t version_ = -1;
+ int64_t id_ = -1;
+ std::vector<DataField> fields_;
+ /// Not available from fields, as some fields may have been deleted.
+ int32_t highest_field_id_ = -1;
+ std::vector<std::string> partition_keys_;
+ std::vector<std::string> primary_keys_;
+ std::vector<std::string> bucket_keys_;
+ int32_t num_bucket_ = -1;
+ std::map<std::string, std::string> options_;
+ std::optional<std::string> comment_;
+ int64_t time_millis_ = -1;
+};
+} // namespace paimon
diff --git a/src/paimon/core/schema/table_schema_test.cpp
b/src/paimon/core/schema/table_schema_test.cpp
new file mode 100644
index 0000000..dedb3bc
--- /dev/null
+++ b/src/paimon/core/schema/table_schema_test.cpp
@@ -0,0 +1,1301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/schema/table_schema.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class TableSchemaTest : public ::testing::Test {
+ public:
+ arrow::FieldVector MakeArrowField(int32_t id) const {
+ arrow::FieldVector arrow_fields;
+ arrow_fields.reserve(5);
+ auto meta =
+ arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
{std::to_string(id)});
+ arrow_fields.push_back(std::make_shared<arrow::Field>("sub1",
arrow::date32(),
+
/*nullable=*/true, meta));
+ meta =
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+ {std::to_string(id + 1)});
+ arrow_fields.push_back(
+ std::make_shared<arrow::Field>("sub2",
arrow::timestamp(arrow::TimeUnit::type::NANO),
+ /*nullable=*/true, meta));
+ meta =
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+ {std::to_string(id + 2)});
+ arrow_fields.push_back(std::make_shared<arrow::Field>("sub3",
arrow::decimal128(23, 5),
+
/*nullable=*/true, meta));
+ meta =
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+ {std::to_string(id + 3)});
+ arrow_fields.push_back(std::make_shared<arrow::Field>("sub4",
arrow::binary(),
+
/*nullable=*/true, meta));
+ meta =
arrow::KeyValueMetadata::Make({std::string(DataField::FIELD_ID)},
+ {std::to_string(id + 4)});
+ arrow_fields.push_back(std::make_shared<arrow::Field>("sub5",
arrow::binary(),
+
/*nullable=*/true, meta));
+ return arrow_fields;
+ }
+
+ std::string ReplaceAll(const std::string& str) {
+ std::string replaced_str = StringUtils::Replace(str, " ", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+ return replaced_str;
+ }
+};
+
+TEST_F(TableSchemaTest, TestCreateWithAllFieldsNotHaveFieldId) {
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::boolean()), arrow::field("f1",
arrow::int8()),
+ arrow::field("f2", arrow::int8()), arrow::field("f3",
arrow::int16()),
+ arrow::field("f4", arrow::int16()), arrow::field("f5",
arrow::int32()),
+ arrow::field("f6", arrow::int32()), arrow::field("f7",
arrow::int64()),
+ arrow::field("f8", arrow::int64()), arrow::field("f9",
arrow::float32()),
+ arrow::field("f10", arrow::float64()), arrow::field("f11",
arrow::utf8()),
+ arrow::field("f12", arrow::binary()),
arrow::field("non-partition-field", arrow::int32())};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"f1", "f2"};
+ std::vector<std::string> primary_keys = {"f3", "f4"};
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_TRUE(table_schema);
+
+ std::vector<DataField> expected_data_fields = {
+ DataField(0, arrow::field("f0", arrow::boolean())),
+ DataField(1, arrow::field("f1", arrow::int8())),
+ DataField(2, arrow::field("f2", arrow::int8())),
+ DataField(3, arrow::field("f3", arrow::int16(), /*nullable=*/false)),
+ DataField(4, arrow::field("f4", arrow::int16(), /*nullable=*/false)),
+ DataField(5, arrow::field("f5", arrow::int32())),
+ DataField(6, arrow::field("f6", arrow::int32())),
+ DataField(7, arrow::field("f7", arrow::int64())),
+ DataField(8, arrow::field("f8", arrow::int64())),
+ DataField(9, arrow::field("f9", arrow::float32())),
+ DataField(10, arrow::field("f10", arrow::float64())),
+ DataField(11, arrow::field("f11", arrow::utf8())),
+ DataField(12, arrow::field("f12", arrow::binary())),
+ DataField(13, arrow::field("non-partition-field", arrow::int32()))};
+
+ ASSERT_EQ(table_schema->Fields(), expected_data_fields);
+ ASSERT_EQ(table_schema->Id(), 0);
+ ASSERT_EQ(table_schema->HighestFieldId(), 13);
+ ASSERT_EQ(table_schema->PrimaryKeys(), primary_keys);
+ ASSERT_EQ(table_schema->PartitionKeys(), partition_keys);
+ ASSERT_EQ(table_schema->Options(), options);
+}
+
+TEST_F(TableSchemaTest, TestCreateWithAllFieldsHaveFieldId) {
+ arrow::FieldVector fields = MakeArrowField(0);
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"sub1", "sub2"};
+ std::vector<std::string> primary_keys = {"sub3", "sub4"};
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_TRUE(table_schema);
+
+ std::vector<DataField> expected_data_fields = {
+ DataField(0, arrow::field("sub1", arrow::date32())),
+ DataField(1, arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::type::NANO))),
+ DataField(2, arrow::field("sub3", arrow::decimal128(23, 5),
/*nullable=*/false)),
+ DataField(3, arrow::field("sub4", arrow::binary(),
/*nullable=*/false)),
+ DataField(4, arrow::field("sub5", arrow::binary()))};
+ ASSERT_EQ(table_schema->Fields(), expected_data_fields);
+ ASSERT_EQ(table_schema->Id(), 0);
+ ASSERT_EQ(table_schema->HighestFieldId(), 4);
+ ASSERT_EQ(table_schema->PrimaryKeys(), primary_keys);
+ ASSERT_EQ(table_schema->PartitionKeys(), partition_keys);
+ ASSERT_EQ(table_schema->Options(), options);
+}
+
+TEST_F(TableSchemaTest, TestInvalidCreate) {
+ // partial fields have field id
+ arrow::FieldVector fields = MakeArrowField(3);
+ fields.push_back(arrow::field("f0", arrow::boolean()));
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"sub1", "sub2"};
+ std::vector<std::string> primary_keys = {"sub3", "sub4"};
+ std::map<std::string, std::string> options;
+ ASSERT_NOK(TableSchema::Create(/*schema_id=*/1, schema, partition_keys,
primary_keys, options));
+}
+
+TEST_F(TableSchemaTest, TestDeserializeAppendTableSchema) {
+ auto fs = std::make_shared<LocalFileSystem>();
+
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09/schema/schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+ TableSchema::CreateFromJson(content));
+ TableSchema expected_schema;
+ expected_schema.version_ = 3;
+ expected_schema.id_ = 0;
+
+ auto field1 = DataField(0, arrow::field("f0", arrow::utf8()));
+ auto field2 = DataField(1, arrow::field("f1", arrow::int32()));
+ auto field3 = DataField(2, arrow::field("f2", arrow::int32()));
+ auto field4 = DataField(3, arrow::field("f3", arrow::float64()));
+ expected_schema.fields_ = {field1, field2, field3, field4};
+
+ expected_schema.highest_field_id_ = 3;
+ expected_schema.partition_keys_ = {"f1"};
+ expected_schema.primary_keys_ = {};
+ expected_schema.options_ = {
+ {"bucket", "2"}, {"bucket-key", "f2"}, {"manifest.format", "orc"},
{"file.format", "orc"}};
+ expected_schema.time_millis_ = 1721614341162;
+
+ ASSERT_EQ(*result_schema, expected_schema);
+
+ ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2", "f3"}),
result_schema->FieldNames());
+ ASSERT_EQ(field2, result_schema->GetField("f1").value());
+ ASSERT_EQ(field2, result_schema->GetField(1).value());
+ ASSERT_NOK_WITH_MSG(result_schema->GetField("non-exist"),
+ "Get field non-exist failed: not exist in table
schema");
+ ASSERT_NOK_WITH_MSG(result_schema->GetField(100),
+ "Get field with id 100 failed: not exist in table
schema");
+}
+
+TEST_F(TableSchemaTest, TestDeserializePkTableSchema) {
+ auto fs = std::make_shared<LocalFileSystem>();
+
+ std::string path = paimon::test::GetDataDir() +
"/orc/pk_09.db/pk_09/schema/schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+ TableSchema::CreateFromJson(content));
+ TableSchema expected_schema;
+ expected_schema.version_ = 3;
+ expected_schema.id_ = 0;
+
+ auto field1 = DataField(0, arrow::field("f0", arrow::utf8(),
/*nullable=*/false));
+ auto field2 = DataField(1, arrow::field("f1", arrow::int32(),
/*nullable=*/false));
+ auto field3 = DataField(2, arrow::field("f2", arrow::int32(),
/*nullable=*/false));
+ auto field4 = DataField(3, arrow::field("f3", arrow::float64(),
/*nullable=*/true));
+ expected_schema.fields_ = {field1, field2, field3, field4};
+
+ expected_schema.highest_field_id_ = 3;
+ expected_schema.partition_keys_ = {"f1"};
+ expected_schema.primary_keys_ = {"f0", "f1", "f2"};
+ expected_schema.options_ = {{"bucket", "2"},
+ {"bucket-key", "f2"},
+ {"manifest.format", "orc"},
+ {"file.format", "orc"},
+ {"deletion-vectors.enabled", "true"},
+ {"commit.force-compact", "true"}};
+ expected_schema.time_millis_ = 1725534144802;
+
+ ASSERT_EQ(*result_schema, expected_schema);
+ ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2", "f3"}),
result_schema->FieldNames());
+ ASSERT_OK_AND_ASSIGN(auto trimmed_primary_keys,
result_schema->TrimmedPrimaryKeys());
+ ASSERT_EQ(trimmed_primary_keys, std::vector<std::string>({"f0", "f2"}));
+ ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+ result_schema->GetFields(trimmed_primary_keys));
+ ASSERT_EQ(2, trimmed_primary_key_fields.size());
+ ASSERT_EQ(trimmed_primary_key_fields[0], field1);
+ ASSERT_EQ(trimmed_primary_key_fields[1], field3);
+}
+
+TEST_F(TableSchemaTest, TestTrimmedPrimaryKeyInterfaces) {
+ arrow::FieldVector fields = {
+ arrow::field("pt", arrow::utf8()),
+ arrow::field("id", arrow::int64()),
+ arrow::field("val", arrow::int32()),
+ };
+ auto schema = arrow::schema(fields);
+
+ ASSERT_OK_AND_ASSIGN(auto table_schema,
TableSchema::Create(/*schema_id=*/0, schema,
+
/*partition_keys=*/{"pt"},
+
/*primary_keys=*/{"pt", "id"},
+
/*options=*/{}));
+
+ ASSERT_OK_AND_ASSIGN(std::vector<std::string> trimmed_primary_keys,
+ table_schema->TrimmedPrimaryKeys());
+ ASSERT_EQ(trimmed_primary_keys, std::vector<std::string>({"id"}));
+
+ ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+ table_schema->TrimmedPrimaryKeyFields());
+ ASSERT_EQ(trimmed_primary_key_fields.size(), 1);
+ ASSERT_EQ(trimmed_primary_key_fields[0].Name(), "id");
+ ASSERT_FALSE(trimmed_primary_key_fields[0].Nullable());
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Schema>
trimmed_primary_key_schema,
+ table_schema->TrimmedPrimaryKeySchema());
+ ASSERT_EQ(trimmed_primary_key_schema->num_fields(), 1);
+ ASSERT_EQ(trimmed_primary_key_schema->field(0)->name(), "id");
+ ASSERT_FALSE(trimmed_primary_key_schema->field(0)->nullable());
+}
+
+TEST_F(TableSchemaTest, TestTrimmedPrimaryKeyInterfacesWithoutPrimaryKeys) {
+ arrow::FieldVector fields = {
+ arrow::field("pt", arrow::utf8()),
+ arrow::field("val", arrow::int32()),
+ };
+ auto schema = arrow::schema(fields);
+
+ ASSERT_OK_AND_ASSIGN(auto table_schema,
TableSchema::Create(/*schema_id=*/0, schema,
+
/*partition_keys=*/{"pt"},
+
/*primary_keys=*/{},
+
/*options=*/{}));
+
+ ASSERT_OK_AND_ASSIGN(std::vector<std::string> trimmed_primary_keys,
+ table_schema->TrimmedPrimaryKeys());
+ ASSERT_TRUE(trimmed_primary_keys.empty());
+
+ ASSERT_OK_AND_ASSIGN(std::vector<DataField> trimmed_primary_key_fields,
+ table_schema->TrimmedPrimaryKeyFields());
+ ASSERT_TRUE(trimmed_primary_key_fields.empty());
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Schema>
trimmed_primary_key_schema,
+ table_schema->TrimmedPrimaryKeySchema());
+ ASSERT_EQ(trimmed_primary_key_schema->num_fields(), 0);
+}
+
+TEST_F(TableSchemaTest, TestDeserializeAppendTableSchemaWithTimestamp) {
+ auto fs = std::make_shared<LocalFileSystem>();
+
+ std::string path = paimon::test::GetDataDir() +
+
"/orc/append_with_multiple_ts_precision_and_timezone.db/"
+
"append_with_multiple_ts_precision_and_timezone/schema/schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+ TableSchema::CreateFromJson(content));
+ TableSchema expected_schema;
+ expected_schema.version_ = 3;
+ expected_schema.id_ = 0;
+
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ std::vector<DataField> fields = {
+ DataField(0, arrow::field("ts_sec",
arrow::timestamp(arrow::TimeUnit::SECOND))),
+ DataField(1, arrow::field("ts_milli",
arrow::timestamp(arrow::TimeUnit::MILLI))),
+ DataField(2, arrow::field("ts_micro",
arrow::timestamp(arrow::TimeUnit::MICRO))),
+ DataField(3, arrow::field("ts_nano",
arrow::timestamp(arrow::TimeUnit::NANO))),
+ DataField(4,
+ arrow::field("ts_tz_sec",
arrow::timestamp(arrow::TimeUnit::SECOND, timezone))),
+ DataField(5,
+ arrow::field("ts_tz_milli",
arrow::timestamp(arrow::TimeUnit::MILLI, timezone))),
+ DataField(6,
+ arrow::field("ts_tz_micro",
arrow::timestamp(arrow::TimeUnit::MICRO, timezone))),
+ DataField(7,
+ arrow::field("ts_tz_nano",
arrow::timestamp(arrow::TimeUnit::NANO, timezone)))};
+ expected_schema.fields_ = fields;
+
+ expected_schema.highest_field_id_ = 7;
+ expected_schema.partition_keys_ = {};
+ expected_schema.primary_keys_ = {};
+ expected_schema.options_ = {{"bucket", "-1"},
+ {"manifest.format", "orc"},
+ {"file.format", "orc"},
+ {"orc.timestamp-ltz.legacy.type", "false"}};
+ expected_schema.time_millis_ = 1757927622210;
+
+ ASSERT_EQ(*result_schema, expected_schema);
+
+ ASSERT_EQ(std::vector<std::string>({"ts_sec", "ts_milli", "ts_micro",
"ts_nano", "ts_tz_sec",
+ "ts_tz_milli", "ts_tz_micro",
"ts_tz_nano"}),
+ result_schema->FieldNames());
+}
+
+TEST_F(TableSchemaTest, TestDeserializeWithNestedDataType) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string path = paimon::test::GetDataDir() +
+
"/orc/append_table_with_nested_type.db/append_table_with_nested_type/schema/"
+ "schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> result_schema,
+ TableSchema::CreateFromJson(content));
+ TableSchema expected_schema;
+ expected_schema.version_ = 3;
+ expected_schema.id_ = 0;
+
+ auto field1 = DataField(0, arrow::field("f0",
arrow::struct_(MakeArrowField(1))));
+ auto field2 = DataField(6, arrow::field("f1",
arrow::list(arrow::struct_(MakeArrowField(7)))));
+ auto field3 = DataField(12, arrow::field("f2",
arrow::map(arrow::struct_(MakeArrowField(13)),
+
arrow::struct_(MakeArrowField(18)))));
+ expected_schema.fields_ = {field1, field2, field3};
+
+ expected_schema.highest_field_id_ = 22;
+ expected_schema.partition_keys_ = {};
+ expected_schema.primary_keys_ = {};
+ expected_schema.options_ = {{"manifest.format", "orc"}, {"file.format",
"orc"}};
+ expected_schema.time_millis_ = 1729759141146;
+
+ ASSERT_EQ(*result_schema, expected_schema);
+ ASSERT_EQ(std::vector<std::string>({"f0", "f1", "f2"}),
result_schema->FieldNames());
+}
+
+TEST_F(TableSchemaTest, TestEmptyBucketKey) {
+ // empty bucket key, will use trimmed pk as bucket key
+ {
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "bucket-key" : "",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+ TableSchema::CreateFromJson(table_schema_str));
+ ASSERT_EQ(std::vector<std::string>({"f2"}),
schema_result->bucket_keys_);
+ }
+ {
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+ TableSchema::CreateFromJson(table_schema_str));
+ ASSERT_EQ(std::vector<std::string>({"f2"}),
schema_result->bucket_keys_);
+ }
+}
+
+TEST_F(TableSchemaTest, TestToJson) {
+ auto field1 = DataField(0, arrow::field("f0", arrow::utf8()));
+ auto field2 = DataField(1, arrow::field("f1", arrow::int32()));
+ auto field3 = DataField(2, arrow::field("f2", arrow::int32()));
+ auto field4 = DataField(3, arrow::field("f3", arrow::float64()));
+ auto data_fields = {field1, field2, field3, field4};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+ ASSERT_OK_AND_ASSIGN(
+ auto schema, TableSchema::Create(/*schema_id=*/0, arrow_schema,
/*partition_keys=*/{"f1"},
+ /*primary_keys=*/{},
+ {{"manifest.format", "orc"},
{"file.format", "orc"}}));
+ schema->time_millis_ = 1721614341162;
+ ASSERT_OK_AND_ASSIGN(std::string json_str, schema->ToJsonString());
+ std::string table_schema_str = R"({
+ "version": 3,
+ "id": 0,
+ "fields": [
+ {
+ "id": 0,
+ "name": "f0",
+ "type": "STRING"
+ },
+ {
+ "id": 1,
+ "name": "f1",
+ "type": "INT"
+ },
+ {
+ "id": 2,
+ "name": "f2",
+ "type": "INT"
+ },
+ {
+ "id": 3,
+ "name": "f3",
+ "type": "DOUBLE"
+ }
+ ],
+ "highestFieldId": 3,
+ "partitionKeys": [
+ "f1"
+ ],
+ "primaryKeys": [],
+ "options": {
+ "file.format": "orc",
+ "manifest.format": "orc"
+ },
+ "timeMillis": 1721614341162
+})";
+ ASSERT_EQ(ReplaceAll(table_schema_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJson2) {
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "file.format" : "orc",
+ "manifest.format" : "orc"
+ },
+ "comment" : "this is a comment",
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> schema_result,
+ TableSchema::CreateFromJson(table_schema_str));
+ ASSERT_OK_AND_ASSIGN(std::string json_str, schema_result->ToJsonString());
+ ASSERT_EQ(ReplaceAll(table_schema_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJson3) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string path =
+ paimon::test::GetDataDir() +
+
"/orc/append_complex_build_in_fieldid.db/append_complex_build_in_fieldid/schema/"
+ "schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> st,
TableSchema::CreateFromJson(content));
+ ASSERT_OK_AND_ASSIGN(std::string json_str, st->ToJsonString());
+ std::string expected_str = R"==({
+ "version": 3,
+ "id": 0,
+ "fields": [
+ {
+ "id": 0,
+ "name": "f1",
+ "type": {
+ "type": "MAP",
+ "key": "TINYINT NOT NULL",
+ "value": "SMALLINT"
+ }
+ },
+ {
+ "id": 1,
+ "name": "f2",
+ "type": {
+ "type": "ARRAY",
+ "element": "FLOAT"
+ }
+ },
+ {
+ "id": 2,
+ "name": "f3",
+ "type": {
+ "type": "ROW",
+ "fields": [
+ {
+ "id": 3,
+ "name": "f0",
+ "type": "BOOLEAN"
+ },
+ {
+ "id": 4,
+ "name": "f1",
+ "type": "BIGINT"
+ }
+ ]
+ }
+ },
+ {
+ "id": 5,
+ "name": "f4",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 6,
+ "name": "f5",
+ "type": "DATE"
+ },
+ {
+ "id": 7,
+ "name": "f6",
+ "type": "DECIMAL(2, 2)"
+ }
+ ],
+ "highestFieldId": 7,
+ "partitionKeys": [],
+ "primaryKeys": [],
+ "options": {
+ "file.format": "orc",
+ "manifest.format": "orc"
+ },
+ "timeMillis": 1732079677062
+})==";
+ ASSERT_EQ(ReplaceAll(expected_str), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJsonWithNestedField0) {
+ auto map_type = arrow::map(arrow::int8(), arrow::int16());
+ auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField(
+ DataField(536871936, arrow::field("item", arrow::float32()))));
+ std::vector<DataField> struct_fields = {DataField(3, arrow::field("f0",
arrow::boolean())),
+ DataField(4, arrow::field("f1",
arrow::int64()))};
+ auto struct_type =
DataField::ConvertDataFieldsToArrowStructType(struct_fields);
+ auto data_fields = {DataField(0, arrow::field("f1", map_type)),
+ DataField(1, arrow::field("f2", list_type)),
+ DataField(2, arrow::field("f3", struct_type)),
+ DataField(5, arrow::field("f4",
arrow::timestamp(arrow::TimeUnit::NANO))),
+ DataField(6, arrow::field("f5", arrow::date32())),
+ DataField(7, arrow::field("f6", arrow::decimal128(2,
2)))};
+
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+ ASSERT_OK_AND_ASSIGN(auto schema,
+ TableSchema::Create(/*schema_id=*/0, arrow_schema,
/*partition_keys=*/{},
+ /*primary_keys=*/{},
+ {{"manifest.format", "orc"},
{"file.format", "orc"}}));
+
+ schema->time_millis_ = 1721614341162;
+ ASSERT_OK_AND_ASSIGN(std::string json_str, schema->ToJsonString());
+ std::string expected_schema = R"==({
+ "version": 3,
+ "id": 0,
+ "fields": [
+ {
+ "id": 0,
+ "name": "f1",
+ "type": {
+ "type": "MAP",
+ "key": "TINYINT NOT NULL",
+ "value": "SMALLINT"
+ }
+ },
+ {
+ "id": 1,
+ "name": "f2",
+ "type": {
+ "type": "ARRAY",
+ "element": "FLOAT"
+ }
+ },
+ {
+ "id": 2,
+ "name": "f3",
+ "type": {
+ "type": "ROW",
+ "fields": [
+ {
+ "id": 3,
+ "name": "f0",
+ "type": "BOOLEAN"
+ },
+ {
+ "id": 4,
+ "name": "f1",
+ "type": "BIGINT"
+ }
+ ]
+ }
+ },
+ {
+ "id": 5,
+ "name": "f4",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 6,
+ "name": "f5",
+ "type": "DATE"
+ },
+ {
+ "id": 7,
+ "name": "f6",
+ "type": "DECIMAL(2, 2)"
+ }
+ ],
+ "highestFieldId": 7,
+ "partitionKeys": [],
+ "primaryKeys": [],
+ "options": {
+ "file.format": "orc",
+ "manifest.format": "orc"
+ },
+ "timeMillis": 1721614341162
+})==";
+ ASSERT_EQ(ReplaceAll(expected_schema), ReplaceAll(json_str));
+}
+
+TEST_F(TableSchemaTest, TestToJsonWithNestedField1) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string path = paimon::test::GetDataDir() +
+
"/orc/append_table_with_nested_type.db/append_table_with_nested_type/schema/"
+ "schema-0";
+ std::string content;
+ ASSERT_OK(fs->ReadFile(path, &content));
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableSchema> st,
TableSchema::CreateFromJson(content));
+ ASSERT_OK_AND_ASSIGN(std::string json_str, st->ToJsonString());
+ std::string expected_str = R"==({
+ "version": 3,
+ "id": 0,
+ "fields": [
+ {
+ "id": 0,
+ "name": "f0",
+ "type": {
+ "type": "ROW",
+ "fields": [
+ {
+ "id": 1,
+ "name": "sub1",
+ "type": "DATE"
+ },
+ {
+ "id": 2,
+ "name": "sub2",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 3,
+ "name": "sub3",
+ "type": "DECIMAL(23, 5)"
+ },
+ {
+ "id": 4,
+ "name": "sub4",
+ "type": "BYTES"
+ },
+ {
+ "id": 5,
+ "name": "sub5",
+ "type": "BYTES"
+ }
+ ]
+ }
+ },
+ {
+ "id": 6,
+ "name": "f1",
+ "type": {
+ "type": "ARRAY",
+ "element": {
+ "type": "ROW",
+ "fields": [
+ {
+ "id": 7,
+ "name": "sub1",
+ "type": "DATE"
+ },
+ {
+ "id": 8,
+ "name": "sub2",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 9,
+ "name": "sub3",
+ "type": "DECIMAL(23, 5)"
+ },
+ {
+ "id": 10,
+ "name": "sub4",
+ "type": "BYTES"
+ },
+ {
+ "id": 11,
+ "name": "sub5",
+ "type": "BYTES"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "id": 12,
+ "name": "f2",
+ "type": {
+ "type": "MAP",
+ "key": {
+ "type": "ROW NOT NULL",
+ "fields": [
+ {
+ "id": 13,
+ "name": "sub1",
+ "type": "DATE"
+ },
+ {
+ "id": 14,
+ "name": "sub2",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 15,
+ "name": "sub3",
+ "type": "DECIMAL(23, 5)"
+ },
+ {
+ "id": 16,
+ "name": "sub4",
+ "type": "BYTES"
+ },
+ {
+ "id": 17,
+ "name": "sub5",
+ "type": "BYTES"
+ }
+ ]
+ },
+ "value": {
+ "type": "ROW",
+ "fields": [
+ {
+ "id": 18,
+ "name": "sub1",
+ "type": "DATE"
+ },
+ {
+ "id": 19,
+ "name": "sub2",
+ "type": "TIMESTAMP(9)"
+ },
+ {
+ "id": 20,
+ "name": "sub3",
+ "type": "DECIMAL(23, 5)"
+ },
+ {
+ "id": 21,
+ "name": "sub4",
+ "type": "BYTES"
+ },
+ {
+ "id": 22,
+ "name": "sub5",
+ "type": "BYTES"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "highestFieldId": 22,
+ "partitionKeys": [],
+ "primaryKeys": [],
+ "options": {
+ "file.format": "orc",
+ "manifest.format": "orc"
+ },
+ "timeMillis": 1729759141146
+})==";
+
+ ASSERT_EQ(ReplaceAll(expected_str), ReplaceAll(json_str));
+ // test create from arrow schema, test set field id
+ auto f0 = arrow::field(
+ "f0", arrow::struct_({arrow::field("sub1", arrow::date32()),
+ arrow::field("sub2",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("sub3", arrow::decimal128(23, 5)),
+ arrow::field("sub4", arrow::binary()),
+ arrow::field("sub5", arrow::binary())}));
+ auto f1 = arrow::field("f1", arrow::list(f0));
+ auto f2 = arrow::field("f2", arrow::map(f0->type(), f0->type()));
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto arrow_schema = arrow::schema(fields);
+ ASSERT_OK_AND_ASSIGN(
+ auto new_table_schema,
+ TableSchema::Create(0, arrow_schema, /*partition_keys=*/{},
/*primary_keys=*/{},
+ /*options=*/{{"file.format", "orc"},
{"manifest.format", "orc"}}));
+ new_table_schema->time_millis_ = 1729759141146;
+ ASSERT_OK_AND_ASSIGN(std::string new_json_str,
new_table_schema->ToJsonString());
+ ASSERT_EQ(expected_str, new_json_str) << new_json_str;
+}
+
+TEST_F(TableSchemaTest, TestInvalidSchema) {
+ {
+ // partition key same as primary key
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1", "f2" ],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "bucket-key" : "f2",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+ "this will result in only one record in a
partition");
+ }
+ {
+ // bucket key is non-exist in fields
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "bucket-key" : "non-exist",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+ "should contain all bucket keys");
+ }
+ {
+ // bucket key in partition key
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "bucket-key" : "f1,f3",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+ "should not in partition keys");
+ }
+ {
+ // bucket key is not a subset of primary key
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : "STRING"
+ }, {
+ "id" : 1,
+ "name" : "f1",
+ "type" : "INT"
+ }, {
+ "id" : 2,
+ "name" : "f2",
+ "type" : "INT"
+ }, {
+ "id" : 3,
+ "name" : "f3",
+ "type" : "DOUBLE"
+ } ],
+ "highestFieldId" : 3,
+ "partitionKeys" : [ "f1"],
+ "primaryKeys" : [ "f1", "f2" ],
+ "options" : {
+ "bucket" : "2",
+ "bucket-key" : "f3",
+ "manifest.format" : "orc",
+ "file.format" : "orc"
+ },
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+ "should contain all bucket keys");
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdBasicType) {
+ auto field = arrow::field("column1", arrow::int32());
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(field,
/*set_field_id=*/true, &field_id));
+ ASSERT_TRUE(new_field->metadata());
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+ ASSERT_EQ(field_id, 1);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(field,
/*set_field_id=*/false, &field_id));
+ ASSERT_FALSE(new_field->metadata());
+ ASSERT_EQ(field_id, 0);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdStructType) {
+ auto child1 = arrow::field("child1", arrow::int32());
+ auto child2 = arrow::field("child2", arrow::float64());
+ auto struct_field = arrow::field("parent", arrow::struct_({child1,
child2}));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(struct_field,
/*set_field_id=*/true, &field_id));
+ auto struct_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+ ASSERT_EQ(struct_type->num_fields(), 2);
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+
ASSERT_EQ(struct_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"2");
+ ASSERT_EQ(field_id, 3);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(
+ struct_field, /*set_field_id=*/false,
&field_id));
+ auto struct_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+ ASSERT_EQ(struct_type->num_fields(), 2);
+ ASSERT_FALSE(new_field->metadata());
+
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"0");
+
ASSERT_EQ(struct_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+ ASSERT_EQ(field_id, 2);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdListType) {
+ auto value_field = arrow::field("values", arrow::int32());
+ auto list_field = arrow::field("list_column", arrow::list(value_field));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(list_field,
/*set_field_id=*/true, &field_id));
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+ auto list_type =
arrow::internal::checked_pointer_cast<arrow::ListType>(new_field->type());
+ ASSERT_FALSE(list_type->value_field()->metadata());
+ ASSERT_EQ(field_id, 1);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(list_field,
/*set_field_id=*/false, &field_id));
+ ASSERT_FALSE(new_field->metadata());
+ auto list_type =
arrow::internal::checked_pointer_cast<arrow::ListType>(new_field->type());
+ ASSERT_FALSE(list_type->value_field()->metadata());
+ ASSERT_EQ(field_id, 0);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdMapType) {
+ auto map_field = arrow::field("map_column", arrow::map(arrow::utf8(),
arrow::int32()));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(map_field,
/*set_field_id=*/true, &field_id));
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+ auto map_type =
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+ std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+ std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+ ASSERT_FALSE(key_field->metadata());
+ ASSERT_FALSE(value_field->metadata());
+ ASSERT_EQ(field_id, 1);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(map_field,
/*set_field_id=*/false, &field_id));
+ ASSERT_FALSE(new_field->metadata());
+ auto map_type =
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+ std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+ std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+ ASSERT_FALSE(key_field->metadata());
+ ASSERT_FALSE(value_field->metadata());
+ ASSERT_EQ(field_id, 0);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdMapWithStruct) {
+ auto inner_child1 = arrow::field("inner1", arrow::int32());
+ auto inner_child2 = arrow::field("inner2", arrow::float64());
+ auto map_field =
+ arrow::field("map_column", arrow::map(arrow::struct_({inner_child1,
inner_child2}),
+ arrow::struct_({inner_child1,
inner_child2})));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(map_field,
/*set_field_id=*/true, &field_id));
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+ auto map_type =
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+ std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+ std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+ ASSERT_FALSE(key_field->metadata());
+ auto key_inner_type =
std::static_pointer_cast<arrow::StructType>(key_field->type());
+
ASSERT_EQ(key_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+
ASSERT_EQ(key_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"2");
+ ASSERT_FALSE(value_field->metadata());
+ auto value_inner_type =
std::static_pointer_cast<arrow::StructType>(value_field->type());
+
ASSERT_EQ(value_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+ "3");
+
ASSERT_EQ(value_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+ "4");
+ ASSERT_EQ(field_id, 5);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(map_field,
/*set_field_id=*/false, &field_id));
+ ASSERT_FALSE(new_field->metadata());
+ auto map_type =
arrow::internal::checked_pointer_cast<arrow::MapType>(new_field->type());
+ std::shared_ptr<arrow::Field> key_field = map_type->key_field();
+ std::shared_ptr<arrow::Field> value_field = map_type->item_field();
+ ASSERT_FALSE(key_field->metadata());
+ auto key_inner_type =
std::static_pointer_cast<arrow::StructType>(key_field->type());
+
ASSERT_EQ(key_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"0");
+
ASSERT_EQ(key_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+ ASSERT_FALSE(value_field->metadata());
+ auto value_inner_type =
std::static_pointer_cast<arrow::StructType>(value_field->type());
+
ASSERT_EQ(value_inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+ "2");
+
ASSERT_EQ(value_inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
+ "3");
+ ASSERT_EQ(field_id, 4);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdNestedStruct) {
+ auto inner_child1 = arrow::field("inner1", arrow::int32());
+ auto inner_child2 = arrow::field("inner2", arrow::float64());
+ auto inner_struct = arrow::field("inner_struct",
arrow::struct_({inner_child1, inner_child2}));
+ auto outer_struct = arrow::field("outer_struct",
arrow::struct_({inner_struct}));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(outer_struct,
/*set_field_id=*/true, &field_id));
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+ auto outer_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+
ASSERT_EQ(outer_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+ auto inner_type =
std::static_pointer_cast<arrow::StructType>(outer_type->field(0)->type());
+
ASSERT_EQ(inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"2");
+
ASSERT_EQ(inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"3");
+ ASSERT_EQ(field_id, 4);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(
+ outer_struct, /*set_field_id=*/false,
&field_id));
+ ASSERT_FALSE(new_field->metadata());
+ auto outer_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+
ASSERT_EQ(outer_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"0");
+ auto inner_type =
std::static_pointer_cast<arrow::StructType>(outer_type->field(0)->type());
+
ASSERT_EQ(inner_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+
ASSERT_EQ(inner_type->field(1)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"2");
+ ASSERT_EQ(field_id, 3);
+ }
+}
+
+TEST_F(TableSchemaTest, SetFieldIdNestedListInStruct) {
+ auto list_value_field = arrow::field("list_values", arrow::int32());
+ auto list_field = arrow::field("list_column",
arrow::list(list_value_field));
+ auto struct_field = arrow::field("struct_with_list",
arrow::struct_({list_field}));
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(struct_field,
/*set_field_id=*/true, &field_id));
+ auto struct_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+
ASSERT_EQ(new_field->metadata()->Get(DataField::FIELD_ID).ValueOrDie(), "0");
+
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"1");
+ auto list_type =
+
arrow::internal::checked_pointer_cast<arrow::ListType>(struct_type->field(0)->type());
+ ASSERT_FALSE(list_type->value_field()->metadata());
+ ASSERT_EQ(field_id, 2);
+ }
+ {
+ int32_t field_id = 0;
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Field> new_field,
+ TableSchema::AssignFieldIdsRecursively(
+ struct_field, /*set_field_id=*/false,
&field_id));
+ auto struct_type =
std::static_pointer_cast<arrow::StructType>(new_field->type());
+ ASSERT_FALSE(new_field->metadata());
+
ASSERT_EQ(struct_type->field(0)->metadata()->Get(DataField::FIELD_ID).ValueOrDie(),
"0");
+ auto list_type =
+
arrow::internal::checked_pointer_cast<arrow::ListType>(struct_type->field(0)->type());
+ ASSERT_FALSE(list_type->value_field()->metadata());
+ ASSERT_EQ(field_id, 1);
+ }
+}
+
+TEST_F(TableSchemaTest, MapKeyMustBeNotNull) {
+ std::string table_schema_str = R"({
+ "version" : 3,
+ "id" : 0,
+ "fields" : [ {
+ "id" : 0,
+ "name" : "f0",
+ "type" : {
+ "type": "MAP",
+ "key": "TINYINT",
+ "value": "SMALLINT"
+ }
+ } ],
+ "highestFieldId" : 0,
+ "partitionKeys" : [],
+ "primaryKeys" : [],
+ "options" : {},
+ "timeMillis" : 1721614341162
+ })";
+ ASSERT_NOK_WITH_MSG(TableSchema::CreateFromJson(table_schema_str),
+ "Map field 'f0' has a nullable key.");
+}
+
+TEST_F(TableSchemaTest, CrossPartitionUpdate) {
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::boolean()), arrow::field("f1",
arrow::int8()),
+ arrow::field("f2", arrow::int8()), arrow::field("f3",
arrow::int16()),
+ arrow::field("f4", arrow::int16()), arrow::field("f5",
arrow::int32())};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f1", "f2"};
+ std::vector<std::string> partition_keys = {"f2", "f3"};
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_TRUE(table_schema);
+ ASSERT_TRUE(table_schema->CrossPartitionUpdate());
+}
+
+TEST_F(TableSchemaTest, CrossPartitionUpdate2) {
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::boolean()), arrow::field("f1",
arrow::int8()),
+ arrow::field("f2", arrow::int8()), arrow::field("f3",
arrow::int16()),
+ arrow::field("f4", arrow::int16()), arrow::field("f5",
arrow::int32())};
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> primary_keys = {"f1", "f2", "f3"};
+ std::vector<std::string> partition_keys = {"f2", "f3"};
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(
+ auto table_schema,
+ TableSchema::Create(/*schema_id=*/0, schema, partition_keys,
primary_keys, options));
+ ASSERT_TRUE(table_schema);
+ ASSERT_FALSE(table_schema->CrossPartitionUpdate());
+}
+
+} // namespace paimon::test