wgtmac commented on code in PR #437:
URL: https://github.com/apache/iceberg-cpp/pull/437#discussion_r2647077157


##########
src/iceberg/schema.cc:
##########
@@ -228,4 +243,33 @@ Result<std::vector<std::string>> 
Schema::IdentifierFieldNames() const {
   return names;
 }
 
+Result<int32_t> Schema::HighestFieldId() const { return 
highest_field_id_.Get(*this); }
+
+bool Schema::SameSchema(const Schema& other) const {
+  return fields_ == other.fields_ && identifier_field_ids_ == 
other.identifier_field_ids_;
+}
+
+Status Schema::Validate(int32_t format_version) const {
+  // Get all fields including nested ones
+  ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
+
+  // Check each field's type and defaults
+  for (const auto& [field_id, field_ref] : id_to_field.get()) {
+    const auto& field = field_ref.get();
+
+    // Check if the field's type requires a minimum format version
+    if (auto it = 
TableMetadata::kMinFormatVersions.find(field.type()->type_id());
+        it != TableMetadata::kMinFormatVersions.end()) {
+      if (int32_t min_format_version = it->second; format_version < 
min_format_version) {
+        return InvalidSchema("Invalid type for {}: {} is not supported until 
v{}",
+                             field.name(), *field.type(), min_format_version);
+      }
+    }
+
+    // TODO(GuoTao.yu): Check default values when they are supported

Review Comment:
   Do you think we need to add the Java `Schema.validateIdentifierField` and 
call it from here?
   
   cc @WZhuo 



##########
src/iceberg/table_metadata.cc:
##########
@@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
   return {};
 }
 
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+  if (schema_id == kLastAdded) {
+    if (!last_added_schema_id_.has_value()) {
+      return ValidationFailed("Cannot set last added schema: no schema has 
been added");
+    }
+    return SetCurrentSchema(last_added_schema_id_.value());
+  }
+
+  if (metadata_.current_schema_id == schema_id) {
+    return {};
+  }
+
+  auto it = schemas_by_id_.find(schema_id);
+  if (it == schemas_by_id_.end()) {
+    return InvalidArgument("Cannot set current schema to unknown schema: {}", 
schema_id);
+  }
+  const auto& schema = it->second;
+
+  // Rebuild all partition specs for the new current schema
+  std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+  for (const auto& spec : metadata_.partition_specs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, 
*spec));
+    updated_specs.push_back(std::move(updated_spec));
+  }
+  metadata_.partition_specs = std::move(updated_specs);
+  specs_by_id_.clear();
+  for (const auto& spec : metadata_.partition_specs) {
+    specs_by_id_.emplace(spec->spec_id(), spec);
+  }
+
+  // Rebuild all sort orders for the new current schema
+  std::vector<std::shared_ptr<SortOrder>> updated_orders;
+  for (const auto& order : metadata_.sort_orders) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, 
*order));
+    updated_orders.push_back(std::move(updated_order));
+  }
+  metadata_.sort_orders = std::move(updated_orders);
+  sort_orders_by_id_.clear();
+  for (const auto& order : metadata_.sort_orders) {
+    sort_orders_by_id_.emplace(order->order_id(), order);
+  }
+
+  // Set the current schema ID
+  metadata_.current_schema_id = schema_id;
+
+  // Record the change
+  if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == 
schema_id) {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+  } else {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(
+    const std::unordered_set<int32_t>& schema_ids) {
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
+                   "Cannot remove current schema: {}", current_schema_id);
+
+  if (!schema_ids.empty()) {
+    metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& 
schema) {
+                          return !schema_ids.contains(
+                              
schema->schema_id().value_or(Schema::kInitialSchemaId));

Review Comment:
   ```suggestion
                             return schema->schema_id().has_value() &&
                                 
!schema_ids.contains(schema->schema_id().value());
   ```



##########
src/iceberg/table_metadata.h:
##########
@@ -259,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public 
ErrorCollector {
   ///
   /// \param schema The schema to add
   /// \return Reference to this builder for method chaining
-  TableMetadataBuilder& AddSchema(std::shared_ptr<Schema> schema);
+  TableMetadataBuilder& AddSchema(std::shared_ptr<Schema> const& schema);

Review Comment:
   ```suggestion
     TableMetadataBuilder& AddSchema(const std::shared_ptr<Schema>& schema);
   ```



##########
src/iceberg/table_metadata.cc:
##########
@@ -740,6 +898,57 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
   return new_spec_id;
 }
 
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
+    const Schema& new_schema) const {
+  // if the schema already exists, use its id; otherwise use the highest id + 1
+  auto new_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  for (auto& schema : metadata_.schemas) {
+    auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);

Review Comment:
   Do you think we need to change `Schema::schema_id` to use `int32_t` instead 
of `std::optional<int32_t>`? It seems that we have to deal with nullopt in all 
places. `PartitionSpec` and `SortOrder` do not use optional for ids and Java 
schema uses 0 as the default. Of course we can do this in a separate PR.



##########
src/iceberg/partition_spec.cc:
##########
@@ -177,6 +177,41 @@ Status PartitionSpec::Validate(const Schema& schema, bool 
allow_missing_fields)
   return {};
 }
 
+Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {

Review Comment:
   Do you think we need to call this function in the `PartitionSpec::Validate` 
as well?



##########
src/iceberg/table_metadata.h:
##########
@@ -246,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public 
ErrorCollector {
   /// \param schema The schema to set as current
   /// \param new_last_column_id The highest column ID in the schema
   /// \return Reference to this builder for method chaining
-  TableMetadataBuilder& SetCurrentSchema(std::shared_ptr<Schema> schema,
+  TableMetadataBuilder& SetCurrentSchema(std::shared_ptr<Schema> const& schema,

Review Comment:
   ```suggestion
     TableMetadataBuilder& SetCurrentSchema(const std::shared_ptr<Schema>& 
schema,
   ```



##########
src/iceberg/table_metadata.cc:
##########
@@ -691,6 +830,25 @@ std::unique_ptr<TableMetadata> 
TableMetadataBuilder::Impl::Build() {
             std::chrono::system_clock::now().time_since_epoch())};
   }
 
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);

Review Comment:
   What about returning an error if `metadata_.current_schema_id.has_value() == 
false`?



##########
src/iceberg/test/table_metadata_builder_test.cc:
##########
@@ -386,4 +400,149 @@ TEST(TableMetadataBuilderTest, 
SetDefaultSortOrderInvalid) {
   ASSERT_THAT(builder->Build(), HasErrorMessage("no sort order has been 
added"));
 }
 
+// Test AddSchema
+TEST(TableMetadataBuilderTest, AddSchemaBasic) {
+  auto base = CreateBaseMetadata();
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  // 1. Add a new schema
+  auto field1 = SchemaField::MakeRequired(4, "new_field1", int64());
+  auto field2 = SchemaField::MakeRequired(5, "new_field2", float64());
+  auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1, 
field2}, 1);
+  builder->AddSchema(new_schema);
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->schemas.size(), 2);
+  EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1);
+  EXPECT_EQ(metadata->last_column_id, 5);
+
+  // 2. Add duplicate schema - should be idempotent
+  builder = TableMetadataBuilder::BuildFrom(base.get());
+  auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1, 
field2}, 1);
+  auto schema2 = std::make_shared<Schema>(std::vector<SchemaField>{field1, 
field2}, 2);
+  builder->AddSchema(schema1);
+  builder->AddSchema(schema2);  // Same fields, should reuse ID
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->schemas.size(), 2);  // Only one new schema added
+
+  // 3. Add multiple different schemas
+  builder = TableMetadataBuilder::BuildFrom(base.get());
+  auto field3 = SchemaField::MakeRequired(6, "field3", string());
+  auto schema3 = std::make_shared<Schema>(std::vector<SchemaField>{field1, 
field2}, 1);
+  auto schema4 = std::make_shared<Schema>(std::vector<SchemaField>{field1, 
field3}, 2);
+  builder->AddSchema(schema3);
+  builder->AddSchema(schema4);
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->schemas.size(), 3);
+  EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1);
+  EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 2);
+  EXPECT_EQ(metadata->last_column_id, 6);
+}
+
+TEST(TableMetadataBuilderTest, AddSchemaInvalid) {

Review Comment:
   Should we remove this test since it tests nothing?



##########
src/iceberg/table_metadata.cc:
##########
@@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
   return {};
 }
 
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+  if (schema_id == kLastAdded) {

Review Comment:
   Could you help fix `if (order_id == -1)` and `if (spec_id == -1)` in this 
file to use `kLastAdded` to replace -1?



##########
src/iceberg/table_metadata.cc:
##########
@@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
   return {};
 }
 
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+  if (schema_id == kLastAdded) {
+    if (!last_added_schema_id_.has_value()) {
+      return ValidationFailed("Cannot set last added schema: no schema has 
been added");
+    }
+    return SetCurrentSchema(last_added_schema_id_.value());
+  }
+
+  if (metadata_.current_schema_id == schema_id) {
+    return {};
+  }
+
+  auto it = schemas_by_id_.find(schema_id);
+  if (it == schemas_by_id_.end()) {
+    return InvalidArgument("Cannot set current schema to unknown schema: {}", 
schema_id);
+  }
+  const auto& schema = it->second;
+
+  // Rebuild all partition specs for the new current schema
+  std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+  for (const auto& spec : metadata_.partition_specs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, 
*spec));
+    updated_specs.push_back(std::move(updated_spec));
+  }
+  metadata_.partition_specs = std::move(updated_specs);
+  specs_by_id_.clear();
+  for (const auto& spec : metadata_.partition_specs) {
+    specs_by_id_.emplace(spec->spec_id(), spec);
+  }
+
+  // Rebuild all sort orders for the new current schema
+  std::vector<std::shared_ptr<SortOrder>> updated_orders;
+  for (const auto& order : metadata_.sort_orders) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, 
*order));
+    updated_orders.push_back(std::move(updated_order));
+  }
+  metadata_.sort_orders = std::move(updated_orders);
+  sort_orders_by_id_.clear();
+  for (const auto& order : metadata_.sort_orders) {
+    sort_orders_by_id_.emplace(order->order_id(), order);
+  }
+
+  // Set the current schema ID
+  metadata_.current_schema_id = schema_id;
+
+  // Record the change
+  if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == 
schema_id) {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+  } else {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(
+    const std::unordered_set<int32_t>& schema_ids) {
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
+                   "Cannot remove current schema: {}", current_schema_id);
+
+  if (!schema_ids.empty()) {
+    metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& 
schema) {
+                          return !schema_ids.contains(
+                              
schema->schema_id().value_or(Schema::kInitialSchemaId));
+                        }) |
+                        
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+    changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
+  }
+
+  return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+                                                      int32_t 
new_last_column_id) {
+  ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id,
+                   "Invalid last column ID: {} < {} (previous last column ID)",
+                   new_last_column_id, metadata_.last_column_id);
+
+  ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+  auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+  if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() &&
+      new_last_column_id == metadata_.last_column_id) {

Review Comment:
   ```suggestion
     if (schemas_by_id_.contains(new_schema_id) &&
         new_last_column_id == metadata_.last_column_id) {
   ```



##########
src/iceberg/table_metadata.cc:
##########
@@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
   return {};
 }
 
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+  if (schema_id == kLastAdded) {
+    if (!last_added_schema_id_.has_value()) {
+      return ValidationFailed("Cannot set last added schema: no schema has 
been added");
+    }
+    return SetCurrentSchema(last_added_schema_id_.value());
+  }
+
+  if (metadata_.current_schema_id == schema_id) {
+    return {};
+  }
+
+  auto it = schemas_by_id_.find(schema_id);
+  if (it == schemas_by_id_.end()) {
+    return InvalidArgument("Cannot set current schema to unknown schema: {}", 
schema_id);
+  }
+  const auto& schema = it->second;
+
+  // Rebuild all partition specs for the new current schema
+  std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+  for (const auto& spec : metadata_.partition_specs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, 
*spec));
+    updated_specs.push_back(std::move(updated_spec));
+  }
+  metadata_.partition_specs = std::move(updated_specs);
+  specs_by_id_.clear();
+  for (const auto& spec : metadata_.partition_specs) {
+    specs_by_id_.emplace(spec->spec_id(), spec);
+  }
+
+  // Rebuild all sort orders for the new current schema
+  std::vector<std::shared_ptr<SortOrder>> updated_orders;
+  for (const auto& order : metadata_.sort_orders) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, 
*order));
+    updated_orders.push_back(std::move(updated_order));
+  }
+  metadata_.sort_orders = std::move(updated_orders);
+  sort_orders_by_id_.clear();
+  for (const auto& order : metadata_.sort_orders) {
+    sort_orders_by_id_.emplace(order->order_id(), order);
+  }
+
+  // Set the current schema ID
+  metadata_.current_schema_id = schema_id;
+
+  // Record the change
+  if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == 
schema_id) {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+  } else {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(
+    const std::unordered_set<int32_t>& schema_ids) {
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
+                   "Cannot remove current schema: {}", current_schema_id);
+
+  if (!schema_ids.empty()) {
+    metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& 
schema) {
+                          return !schema_ids.contains(
+                              
schema->schema_id().value_or(Schema::kInitialSchemaId));
+                        }) |
+                        
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+    changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
+  }
+
+  return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+                                                      int32_t 
new_last_column_id) {
+  ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id,
+                   "Invalid last column ID: {} < {} (previous last column ID)",
+                   new_last_column_id, metadata_.last_column_id);
+
+  ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+  auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+  if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() &&
+      new_last_column_id == metadata_.last_column_id) {
+    // update last_added_schema_id if the schema was added in this set of 
changes (since
+    // it is now the last)
+    bool is_new_schema =
+        last_added_schema_id_.has_value() &&
+        std::ranges::any_of(changes_, [new_schema_id](const auto& change) {
+          if (change->kind() != TableUpdate::Kind::kAddSchema) {
+            return false;
+          }
+          auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
+          return 
add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
+                 new_schema_id;
+        });
+    last_added_schema_id_ =
+        is_new_schema ? std::make_optional(new_schema_id) : std::nullopt;
+    return new_schema_id;
+  }
+
+  auto new_schema =
+      std::make_shared<Schema>(schema.fields() | 
std::ranges::to<std::vector>(),
+                               new_schema_id, schema.IdentifierFieldIds());
+
+  metadata_.schemas.push_back(new_schema);

Review Comment:
   If `new_schema_id` is found in the `metadata_.schemas`, isn't it added twice?



##########
src/iceberg/table_metadata.cc:
##########
@@ -691,6 +830,25 @@ std::unique_ptr<TableMetadata> 
TableMetadataBuilder::Impl::Build() {
             std::chrono::system_clock::now().time_since_epoch())};
   }
 
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  auto schema_it = schemas_by_id_.find(current_schema_id);
+  ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(),
+                   "Current schema ID {} not found in schemas", 
current_schema_id);
+  const auto& current_schema = schema_it->second;
+  {
+    auto spec_it = specs_by_id_.find(metadata_.default_spec_id);
+    // FIXME(GuoTao.yu): Default spec must exist after we support update 
partition spec

Review Comment:
   I think we should return errors if `spec_it` is not found just like 
`sort_order_it`.



##########
src/iceberg/table_metadata.cc:
##########
@@ -740,6 +898,57 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
   return new_spec_id;
 }
 
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
+    const Schema& new_schema) const {
+  // if the schema already exists, use its id; otherwise use the highest id + 1
+  auto new_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  for (auto& schema : metadata_.schemas) {
+    auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
+    if (schema->SameSchema(new_schema)) {
+      return schema_id;
+    } else if (new_schema_id <= schema_id) {
+      new_schema_id = schema_id + 1;
+    }
+  }
+  return new_schema_id;
+}
+
+Result<std::shared_ptr<PartitionSpec>> 
TableMetadataBuilder::Impl::UpdateSpecSchema(
+    const Schema& schema, const PartitionSpec& partition_spec) {
+  // UpdateSpecSchema: Update partition spec to use the new schema
+  // This preserves the partition spec structure but rebinds it to the new 
schema
+
+  // Copy all fields from the partition spec. IDs should not change.
+  std::vector<PartitionField> fields;
+  fields.reserve(partition_spec.fields().size());
+  int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart;

Review Comment:
   We don't need to maintain `last_assigned_field_id` here because 
`PartitionSpec::Make` internally will handle this.



##########
src/iceberg/table_metadata.cc:
##########
@@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
   return {};
 }
 
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+  if (schema_id == kLastAdded) {
+    if (!last_added_schema_id_.has_value()) {
+      return ValidationFailed("Cannot set last added schema: no schema has 
been added");
+    }
+    return SetCurrentSchema(last_added_schema_id_.value());
+  }
+
+  if (metadata_.current_schema_id == schema_id) {
+    return {};
+  }
+
+  auto it = schemas_by_id_.find(schema_id);
+  if (it == schemas_by_id_.end()) {
+    return InvalidArgument("Cannot set current schema to unknown schema: {}", 
schema_id);
+  }
+  const auto& schema = it->second;
+
+  // Rebuild all partition specs for the new current schema
+  std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+  for (const auto& spec : metadata_.partition_specs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, 
*spec));
+    updated_specs.push_back(std::move(updated_spec));
+  }
+  metadata_.partition_specs = std::move(updated_specs);
+  specs_by_id_.clear();
+  for (const auto& spec : metadata_.partition_specs) {
+    specs_by_id_.emplace(spec->spec_id(), spec);
+  }
+
+  // Rebuild all sort orders for the new current schema
+  std::vector<std::shared_ptr<SortOrder>> updated_orders;
+  for (const auto& order : metadata_.sort_orders) {
+    ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, 
*order));
+    updated_orders.push_back(std::move(updated_order));
+  }
+  metadata_.sort_orders = std::move(updated_orders);
+  sort_orders_by_id_.clear();
+  for (const auto& order : metadata_.sort_orders) {
+    sort_orders_by_id_.emplace(order->order_id(), order);
+  }
+
+  // Set the current schema ID
+  metadata_.current_schema_id = schema_id;
+
+  // Record the change
+  if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == 
schema_id) {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+  } else {
+    changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(
+    const std::unordered_set<int32_t>& schema_ids) {
+  auto current_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
+                   "Cannot remove current schema: {}", current_schema_id);
+
+  if (!schema_ids.empty()) {
+    metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& 
schema) {
+                          return !schema_ids.contains(
+                              
schema->schema_id().value_or(Schema::kInitialSchemaId));
+                        }) |
+                        
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+    changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
+  }
+
+  return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+                                                      int32_t 
new_last_column_id) {
+  ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id,
+                   "Invalid last column ID: {} < {} (previous last column ID)",
+                   new_last_column_id, metadata_.last_column_id);
+
+  ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+  auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+  if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() &&
+      new_last_column_id == metadata_.last_column_id) {
+    // update last_added_schema_id if the schema was added in this set of 
changes (since
+    // it is now the last)
+    bool is_new_schema =
+        last_added_schema_id_.has_value() &&
+        std::ranges::any_of(changes_, [new_schema_id](const auto& change) {
+          if (change->kind() != TableUpdate::Kind::kAddSchema) {
+            return false;
+          }
+          auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
+          return 
add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
+                 new_schema_id;

Review Comment:
   ```suggestion
             auto* add_schema = 
internal::checked_cast<table::AddSchema*>(change.get());
             return add_schema->schema()->schema_id() == 
std::make_optional(new_schema_id);
   ```



##########
src/iceberg/table_metadata.cc:
##########
@@ -740,6 +898,57 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
   return new_spec_id;
 }
 
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
+    const Schema& new_schema) const {
+  // if the schema already exists, use its id; otherwise use the highest id + 1
+  auto new_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  for (auto& schema : metadata_.schemas) {
+    auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
+    if (schema->SameSchema(new_schema)) {
+      return schema_id;
+    } else if (new_schema_id <= schema_id) {
+      new_schema_id = schema_id + 1;
+    }
+  }
+  return new_schema_id;
+}
+
+Result<std::shared_ptr<PartitionSpec>> 
TableMetadataBuilder::Impl::UpdateSpecSchema(
+    const Schema& schema, const PartitionSpec& partition_spec) {
+  // UpdateSpecSchema: Update partition spec to use the new schema
+  // This preserves the partition spec structure but rebinds it to the new 
schema
+
+  // Copy all fields from the partition spec. IDs should not change.
+  std::vector<PartitionField> fields;
+  fields.reserve(partition_spec.fields().size());
+  int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
+  for (const auto& field : partition_spec.fields()) {
+    fields.push_back(field);
+    last_assigned_field_id = std::max(last_assigned_field_id, 
field.field_id());
+  }
+
+  // Build without validation because the schema may have changed in a way that
+  // makes this spec invalid. The spec should still be preserved so that older
+  // metadata can be interpreted.
+  ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec,
+                          PartitionSpec::Make(partition_spec.spec_id(), 
std::move(fields),
+                                              last_assigned_field_id));
+
+  // Validate the new partition name against the new schema
+  ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema));

Review Comment:
   It seems that we can always call `ValidatePartitionName` in the 
`PartitionSpec::Make`. We can consider making it a static function so that 
`Validate` can call it as well.



##########
src/iceberg/table_metadata.cc:
##########
@@ -740,6 +898,57 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
   return new_spec_id;
 }
 
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
+    const Schema& new_schema) const {
+  // if the schema already exists, use its id; otherwise use the highest id + 1
+  auto new_schema_id = 
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+  for (auto& schema : metadata_.schemas) {
+    auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
+    if (schema->SameSchema(new_schema)) {
+      return schema_id;
+    } else if (new_schema_id <= schema_id) {
+      new_schema_id = schema_id + 1;
+    }
+  }
+  return new_schema_id;
+}
+
+Result<std::shared_ptr<PartitionSpec>> 
TableMetadataBuilder::Impl::UpdateSpecSchema(
+    const Schema& schema, const PartitionSpec& partition_spec) {
+  // UpdateSpecSchema: Update partition spec to use the new schema
+  // This preserves the partition spec structure but rebinds it to the new 
schema
+
+  // Copy all fields from the partition spec. IDs should not change.
+  std::vector<PartitionField> fields;
+  fields.reserve(partition_spec.fields().size());
+  int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
+  for (const auto& field : partition_spec.fields()) {
+    fields.push_back(field);
+    last_assigned_field_id = std::max(last_assigned_field_id, 
field.field_id());
+  }
+
+  // Build without validation because the schema may have changed in a way that
+  // makes this spec invalid. The spec should still be preserved so that older
+  // metadata can be interpreted.
+  ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec,
+                          PartitionSpec::Make(partition_spec.spec_id(), 
std::move(fields),
+                                              last_assigned_field_id));
+
+  // Validate the new partition name against the new schema
+  ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema));
+  return new_partition_spec;
+}
+
+Result<std::unique_ptr<SortOrder>> 
TableMetadataBuilder::Impl::UpdateSortOrderSchema(

Review Comment:
   I think the key difference between C++ and Java impls is that we don't store 
a `Schema` along with `PartitionSpec` and `SortOrder`. So we even don't need 
`UpdateSortOrderSchema` and `UpdateSpecSchema` functions, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to