This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new bc2e0266 feat: add move column to update schema (#517)
bc2e0266 is described below

commit bc2e0266eb428d058c1cf2e4d957ad6cb980a649
Author: Guotao Yu <[email protected]>
AuthorDate: Tue Jan 20 10:00:57 2026 +0800

    feat: add move column to update schema (#517)
---
 src/iceberg/test/update_schema_test.cc | 937 ++++++++++++++++++++++++++++-----
 src/iceberg/update/update_schema.cc    | 165 +++++-
 src/iceberg/update/update_schema.h     |  23 +
 src/iceberg/util/error_collector.h     |  11 +
 4 files changed, 979 insertions(+), 157 deletions(-)

diff --git a/src/iceberg/test/update_schema_test.cc 
b/src/iceberg/test/update_schema_test.cc
index 1c625bf3..8550c8b5 100644
--- a/src/iceberg/test/update_schema_test.cc
+++ b/src/iceberg/test/update_schema_test.cc
@@ -28,12 +28,14 @@
 #include "iceberg/test/matchers.h"
 #include "iceberg/test/update_test_base.h"
 #include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
 
 namespace iceberg {
 
+using internal::checked_cast;
+
 class UpdateSchemaTest : public UpdateTestBase {};
 
-// Test adding a simple optional column
 TEST_F(UpdateSchemaTest, AddOptionalColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("new_col", int32(), "A new integer column");
@@ -51,7 +53,6 @@ TEST_F(UpdateSchemaTest, AddOptionalColumn) {
   EXPECT_EQ(new_field.doc(), "A new integer column");
 }
 
-// Test adding a required column (should fail without AllowIncompatibleChanges)
 TEST_F(UpdateSchemaTest, AddRequiredColumnFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddRequiredColumn("required_col", string(), "A required string 
column");
@@ -61,7 +62,6 @@ TEST_F(UpdateSchemaTest, AddRequiredColumnFails) {
   EXPECT_THAT(result, HasErrorMessage("Incompatible change"));
 }
 
-// Test adding a required column with AllowIncompatibleChanges
 TEST_F(UpdateSchemaTest, AddRequiredColumnWithAllowIncompatible) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AllowIncompatibleChanges().AddRequiredColumn("required_col", 
string(),
@@ -81,7 +81,6 @@ TEST_F(UpdateSchemaTest, 
AddRequiredColumnWithAllowIncompatible) {
   EXPECT_EQ(new_field.doc(), "A required string column");
 }
 
-// Test adding multiple columns
 TEST_F(UpdateSchemaTest, AddMultipleColumns) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("col1", int32(), "First column")
@@ -104,7 +103,6 @@ TEST_F(UpdateSchemaTest, AddMultipleColumns) {
   EXPECT_EQ(col3_opt->get().type(), boolean());
 }
 
-// Test adding column with dot in name should fail for top-level
 TEST_F(UpdateSchemaTest, AddColumnWithDotInNameFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("col.with.dots", int32(), "Column with dots");
@@ -114,7 +112,6 @@ TEST_F(UpdateSchemaTest, AddColumnWithDotInNameFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot add column with ambiguous 
name"));
 }
 
-// Test adding column to nested struct
 TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
   auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(100, "nested_field", int32(), true, "Nested field")});
@@ -137,7 +134,7 @@ TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
   const auto& struct_field = struct_field_opt->get();
   ASSERT_TRUE(struct_field.type()->is_nested());
 
-  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_struct = checked_cast<const 
StructType&>(*struct_field.type());
   ICEBERG_UNWRAP_OR_FAIL(auto nested_field_opt,
                          nested_struct.GetFieldByName("new_nested_field"));
   ASSERT_TRUE(nested_field_opt.has_value());
@@ -147,7 +144,6 @@ TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
   EXPECT_EQ(nested_field.type(), string());
 }
 
-// Test adding column to non-existent parent fails
 TEST_F(UpdateSchemaTest, AddColumnToNonExistentParentFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("non_existent_parent", "new_field", int32(), "New field");
@@ -157,7 +153,6 @@ TEST_F(UpdateSchemaTest, AddColumnToNonExistentParentFails) 
{
   EXPECT_THAT(result, HasErrorMessage("Cannot find parent struct"));
 }
 
-// Test adding column to non-struct parent fails
 TEST_F(UpdateSchemaTest, AddColumnToNonStructParentFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("primitive_col", int32(), "A primitive column");
@@ -172,7 +167,6 @@ TEST_F(UpdateSchemaTest, AddColumnToNonStructParentFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot add to non-struct column"));
 }
 
-// Test adding duplicate column name fails
 TEST_F(UpdateSchemaTest, AddDuplicateColumnNameFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("duplicate_col", int32(), "First column")
@@ -183,7 +177,6 @@ TEST_F(UpdateSchemaTest, AddDuplicateColumnNameFails) {
   EXPECT_THAT(result, HasErrorMessage("Duplicate path found"));
 }
 
-// Test column ID assignment
 TEST_F(UpdateSchemaTest, ColumnIdAssignment) {
   ICEBERG_UNWRAP_OR_FAIL(auto original_schema, table_->schema());
   int32_t original_last_id = table_->metadata()->last_column_id;
@@ -206,7 +199,6 @@ TEST_F(UpdateSchemaTest, ColumnIdAssignment) {
   EXPECT_EQ(col2_opt->get().field_id(), original_last_id + 2);
 }
 
-// Test adding nested struct with multiple fields
 TEST_F(UpdateSchemaTest, AddNestedStructColumn) {
   auto nested_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(100, "field1", int32(), true, "First nested field"),
@@ -227,7 +219,7 @@ TEST_F(UpdateSchemaTest, AddNestedStructColumn) {
   EXPECT_TRUE(struct_field.type()->is_nested());
   EXPECT_TRUE(struct_field.optional());
 
-  const auto& nested_type = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_type = checked_cast<const 
StructType&>(*struct_field.type());
   EXPECT_EQ(nested_type.fields().size(), 2);
 
   ICEBERG_UNWRAP_OR_FAIL(auto field1_opt, 
nested_type.GetFieldByName("field1"));
@@ -242,7 +234,6 @@ TEST_F(UpdateSchemaTest, AddNestedStructColumn) {
   EXPECT_FALSE(field2_opt->get().optional());
 }
 
-// Test case sensitivity
 TEST_F(UpdateSchemaTest, CaseSensitiveColumnNames) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(true)
@@ -262,7 +253,6 @@ TEST_F(UpdateSchemaTest, CaseSensitiveColumnNames) {
   EXPECT_EQ(lower_opt->get().type(), string());
 }
 
-// Test case insensitive duplicate detection
 TEST_F(UpdateSchemaTest, CaseInsensitiveDuplicateDetection) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(false)
@@ -274,7 +264,6 @@ TEST_F(UpdateSchemaTest, CaseInsensitiveDuplicateDetection) 
{
   EXPECT_THAT(result, HasErrorMessage("Duplicate path found"));
 }
 
-// Test empty update
 TEST_F(UpdateSchemaTest, EmptyUpdate) {
   ICEBERG_UNWRAP_OR_FAIL(auto original_schema, table_->schema());
 
@@ -285,7 +274,6 @@ TEST_F(UpdateSchemaTest, EmptyUpdate) {
   EXPECT_EQ(result.new_last_column_id, table_->metadata()->last_column_id);
 }
 
-// Test commit success
 TEST_F(UpdateSchemaTest, CommitSuccess) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("committed_col", int64(), "A committed column");
@@ -305,7 +293,6 @@ TEST_F(UpdateSchemaTest, CommitSuccess) {
   EXPECT_EQ(field.doc(), "A committed column");
 }
 
-// Test adding fields to map value and list element structs
 TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
   auto map_key_struct = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(20, "address", string(), false),
@@ -342,8 +329,8 @@ TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
   const auto& locations_field = locations_opt->get();
   ASSERT_EQ(locations_field.type()->type_id(), TypeId::kMap);
 
-  const auto& map = static_cast<const MapType&>(*locations_field.type());
-  const auto& value_struct = static_cast<const 
StructType&>(*map.value().type());
+  const auto& map = checked_cast<const MapType&>(*locations_field.type());
+  const auto& value_struct = checked_cast<const 
StructType&>(*map.value().type());
   ICEBERG_UNWRAP_OR_FAIL(auto alt_opt, value_struct.GetFieldByName("alt"));
   ASSERT_TRUE(alt_opt.has_value());
   EXPECT_EQ(alt_opt->get().type(), float32());
@@ -353,14 +340,13 @@ TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
   const auto& points_field = points_opt->get();
   ASSERT_EQ(points_field.type()->type_id(), TypeId::kList);
 
-  const auto& list = static_cast<const ListType&>(*points_field.type());
-  const auto& element_struct = static_cast<const 
StructType&>(*list.element().type());
+  const auto& list = checked_cast<const ListType&>(*points_field.type());
+  const auto& element_struct = checked_cast<const 
StructType&>(*list.element().type());
   ICEBERG_UNWRAP_OR_FAIL(auto z_opt, element_struct.GetFieldByName("z"));
   ASSERT_TRUE(z_opt.has_value());
   EXPECT_EQ(z_opt->get().type(), int64());
 }
 
-// Test adding nested struct with ID reassignment
 TEST_F(UpdateSchemaTest, AddNestedStructWithIdReassignment) {
   auto nested_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(1, "lat", int32(), false), SchemaField(2, "long", int32(), 
true)});
@@ -376,7 +362,7 @@ TEST_F(UpdateSchemaTest, AddNestedStructWithIdReassignment) 
{
   const auto& location_field = location_opt->get();
   ASSERT_TRUE(location_field.type()->is_nested());
 
-  const auto& struct_type = static_cast<const 
StructType&>(*location_field.type());
+  const auto& struct_type = checked_cast<const 
StructType&>(*location_field.type());
   ASSERT_EQ(struct_type.fields().size(), 2);
 
   ICEBERG_UNWRAP_OR_FAIL(auto lat_opt, struct_type.GetFieldByName("lat"));
@@ -389,7 +375,6 @@ TEST_F(UpdateSchemaTest, AddNestedStructWithIdReassignment) 
{
   EXPECT_GT(long_opt->get().field_id(), 1);
 }
 
-// Test adding nested map of structs with ID reassignment
 TEST_F(UpdateSchemaTest, AddNestedMapOfStructs) {
   auto key_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(20, "address", string(), false),
@@ -413,12 +398,12 @@ TEST_F(UpdateSchemaTest, AddNestedMapOfStructs) {
   const auto& locations_field = locations_opt->get();
   ASSERT_EQ(locations_field.type()->type_id(), TypeId::kMap);
 
-  const auto& map = static_cast<const MapType&>(*locations_field.type());
+  const auto& map = checked_cast<const MapType&>(*locations_field.type());
 
-  const auto& key_struct_type = static_cast<const 
StructType&>(*map.key().type());
+  const auto& key_struct_type = checked_cast<const 
StructType&>(*map.key().type());
   EXPECT_EQ(key_struct_type.fields().size(), 4);
 
-  const auto& value_struct_type = static_cast<const 
StructType&>(*map.value().type());
+  const auto& value_struct_type = checked_cast<const 
StructType&>(*map.value().type());
   EXPECT_EQ(value_struct_type.fields().size(), 2);
 
   ICEBERG_UNWRAP_OR_FAIL(auto lat_opt, 
value_struct_type.GetFieldByName("lat"));
@@ -428,7 +413,6 @@ TEST_F(UpdateSchemaTest, AddNestedMapOfStructs) {
   ASSERT_TRUE(long_opt.has_value());
 }
 
-// Test adding nested list of structs with ID reassignment
 TEST_F(UpdateSchemaTest, AddNestedListOfStructs) {
   auto element_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(9, "lat", int32(), false), SchemaField(8, "long", int32(), 
true)});
@@ -447,9 +431,9 @@ TEST_F(UpdateSchemaTest, AddNestedListOfStructs) {
   const auto& locations_field = locations_opt->get();
   ASSERT_EQ(locations_field.type()->type_id(), TypeId::kList);
 
-  const auto& list = static_cast<const ListType&>(*locations_field.type());
+  const auto& list = checked_cast<const ListType&>(*locations_field.type());
   const auto& element_struct_type =
-      static_cast<const StructType&>(*list.element().type());
+      checked_cast<const StructType&>(*list.element().type());
 
   EXPECT_EQ(element_struct_type.fields().size(), 2);
 
@@ -460,7 +444,6 @@ TEST_F(UpdateSchemaTest, AddNestedListOfStructs) {
   ASSERT_TRUE(long_opt.has_value());
 }
 
-// Test adding field with dots in name to nested struct
 TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true)});
@@ -480,7 +463,7 @@ TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
   ASSERT_TRUE(struct_field_opt.has_value());
 
   const auto& struct_field = struct_field_opt->get();
-  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_struct = checked_cast<const 
StructType&>(*struct_field.type());
 
   ICEBERG_UNWRAP_OR_FAIL(auto dotted_field_opt,
                          nested_struct.GetFieldByName("field.with.dots"));
@@ -489,7 +472,6 @@ TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
   EXPECT_EQ(dotted_field_opt->get().type(), int64());
 }
 
-// Test adding field to map key should fail
 TEST_F(UpdateSchemaTest, AddFieldToMapKeyFails) {
   auto key_struct = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(20, "address", string(), false)});
@@ -514,7 +496,6 @@ TEST_F(UpdateSchemaTest, AddFieldToMapKeyFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot add fields to map keys"));
 }
 
-// Test deleting a column
 TEST_F(UpdateSchemaTest, DeleteColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("to_delete", string(), "A column to delete");
@@ -530,7 +511,6 @@ TEST_F(UpdateSchemaTest, DeleteColumn) {
   EXPECT_FALSE(field_opt.has_value());
 }
 
-// Test deleting a nested column
 TEST_F(UpdateSchemaTest, DeleteNestedColumn) {
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true),
@@ -551,7 +531,7 @@ TEST_F(UpdateSchemaTest, DeleteNestedColumn) {
   ASSERT_TRUE(struct_field_opt.has_value());
 
   const auto& struct_field = struct_field_opt->get();
-  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_struct = checked_cast<const 
StructType&>(*struct_field.type());
 
   ICEBERG_UNWRAP_OR_FAIL(auto field1_opt, 
nested_struct.GetFieldByName("field1"));
   ICEBERG_UNWRAP_OR_FAIL(auto field2_opt, 
nested_struct.GetFieldByName("field2"));
@@ -560,7 +540,6 @@ TEST_F(UpdateSchemaTest, DeleteNestedColumn) {
   EXPECT_TRUE(field2_opt.has_value());
 }
 
-// Test deleting missing column fails
 TEST_F(UpdateSchemaTest, DeleteMissingColumnFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->DeleteColumn("non_existent");
@@ -570,7 +549,6 @@ TEST_F(UpdateSchemaTest, DeleteMissingColumnFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete missing column"));
 }
 
-// Test delete then add same column
 TEST_F(UpdateSchemaTest, DeleteThenAdd) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges().AddRequiredColumn("col", int32(),
@@ -591,7 +569,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAdd) {
   EXPECT_TRUE(field.optional());
 }
 
-// Test delete then add nested field
 TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", boolean(), false)});
@@ -612,14 +589,13 @@ TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
   ASSERT_TRUE(struct_field_opt.has_value());
 
   const auto& struct_field = struct_field_opt->get();
-  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_struct = checked_cast<const 
StructType&>(*struct_field.type());
 
   ICEBERG_UNWRAP_OR_FAIL(auto field1_opt, 
nested_struct.GetFieldByName("field1"));
   ASSERT_TRUE(field1_opt.has_value());
   EXPECT_EQ(field1_opt->get().type(), int32());
 }
 
-// Test add-delete conflict
 TEST_F(UpdateSchemaTest, AddDeleteConflict) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("new_col", int32()).DeleteColumn("new_col");
@@ -629,7 +605,6 @@ TEST_F(UpdateSchemaTest, AddDeleteConflict) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete missing column"));
 }
 
-// Test delete column that has additions fails
 TEST_F(UpdateSchemaTest, DeleteColumnWithAdditionsFails) {
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true)});
@@ -647,7 +622,6 @@ TEST_F(UpdateSchemaTest, DeleteColumnWithAdditionsFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete a column that has 
additions"));
 }
 
-// Test delete map key fails
 TEST_F(UpdateSchemaTest, DeleteMapKeyFails) {
   auto map_type = std::make_shared<MapType>(SchemaField(10, "key", string(), 
false),
                                             SchemaField(11, "value", int32(), 
true));
@@ -665,7 +639,6 @@ TEST_F(UpdateSchemaTest, DeleteMapKeyFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete map keys"));
 }
 
-// Test case insensitive delete
 TEST_F(UpdateSchemaTest, DeleteColumnCaseInsensitive) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("MyColumn", string(), "A column with mixed case");
@@ -682,7 +655,6 @@ TEST_F(UpdateSchemaTest, DeleteColumnCaseInsensitive) {
   EXPECT_FALSE(field_opt.has_value());
 }
 
-// Test renaming a column
 TEST_F(UpdateSchemaTest, RenameColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("old_name", string(), "A column to rename");
@@ -702,7 +674,6 @@ TEST_F(UpdateSchemaTest, RenameColumn) {
   EXPECT_EQ(*new_field_opt->get().type(), *string());
 }
 
-// Test renaming nested column
 TEST_F(UpdateSchemaTest, RenameNestedColumn) {
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true),
@@ -723,7 +694,7 @@ TEST_F(UpdateSchemaTest, RenameNestedColumn) {
   ASSERT_TRUE(struct_field_opt.has_value());
 
   const auto& struct_field = struct_field_opt->get();
-  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+  const auto& nested_struct = checked_cast<const 
StructType&>(*struct_field.type());
 
   ICEBERG_UNWRAP_OR_FAIL(auto field1_opt, 
nested_struct.GetFieldByName("field1"));
   ICEBERG_UNWRAP_OR_FAIL(auto renamed_opt, 
nested_struct.GetFieldByName("renamed_field"));
@@ -733,7 +704,6 @@ TEST_F(UpdateSchemaTest, RenameNestedColumn) {
   EXPECT_EQ(*renamed_opt->get().type(), *int32());
 }
 
-// Test renaming column with dots in new name
 TEST_F(UpdateSchemaTest, RenameColumnWithDotsInName) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("simple_name", string());
@@ -751,7 +721,6 @@ TEST_F(UpdateSchemaTest, RenameColumnWithDotsInName) {
   EXPECT_EQ(*new_field_opt->get().type(), *string());
 }
 
-// Test rename missing column fails
 TEST_F(UpdateSchemaTest, RenameMissingColumnFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->RenameColumn("non_existent", "new_name");
@@ -761,7 +730,6 @@ TEST_F(UpdateSchemaTest, RenameMissingColumnFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
 }
 
-// Test rename column that will be deleted fails
 TEST_F(UpdateSchemaTest, RenameDeletedColumnFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", string());
@@ -776,7 +744,6 @@ TEST_F(UpdateSchemaTest, RenameDeletedColumnFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
 }
 
-// Test case insensitive rename
 TEST_F(UpdateSchemaTest, RenameColumnCaseInsensitive) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("MyColumn", string(), "A column with mixed case");
@@ -797,7 +764,6 @@ TEST_F(UpdateSchemaTest, RenameColumnCaseInsensitive) {
   ASSERT_TRUE(new_field_opt.has_value());
 }
 
-// Test rename then delete with old name fails
 TEST_F(UpdateSchemaTest, RenameThenDeleteOldNameFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("old_name", string());
@@ -812,7 +778,6 @@ TEST_F(UpdateSchemaTest, RenameThenDeleteOldNameFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete a column that has 
updates"));
 }
 
-// Test rename then delete with new name fails
 TEST_F(UpdateSchemaTest, RenameThenDeleteNewNameFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("old_name", string());
@@ -827,7 +792,6 @@ TEST_F(UpdateSchemaTest, RenameThenDeleteNewNameFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete missing column"));
 }
 
-// Test rename then add with old name
 TEST_F(UpdateSchemaTest, RenameThenAddWithOldName) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("old_name", int32());
@@ -842,7 +806,6 @@ TEST_F(UpdateSchemaTest, RenameThenAddWithOldName) {
   EXPECT_THAT(result, HasErrorMessage("Cannot add column, name already 
exists"));
 }
 
-// Test add then rename - should fail because RenameColumn only works on 
existing fields
 TEST_F(UpdateSchemaTest, AddThenRename) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("temp_name", string()).RenameColumn("temp_name", 
"final_name");
@@ -852,7 +815,6 @@ TEST_F(UpdateSchemaTest, AddThenRename) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
 }
 
-// Test delete then add then rename - should fail
 TEST_F(UpdateSchemaTest, DeleteThenAddThenRename) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32());
@@ -869,7 +831,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAddThenRename) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
 }
 
-// Test making a column optional
 TEST_F(UpdateSchemaTest, MakeColumnOptional) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges().AddRequiredColumn("id", int32());
@@ -886,7 +847,6 @@ TEST_F(UpdateSchemaTest, MakeColumnOptional) {
   EXPECT_TRUE(field_opt->get().optional());
 }
 
-// Test requiring a column
 TEST_F(UpdateSchemaTest, RequireColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("id", int32());
@@ -912,7 +872,6 @@ TEST_F(UpdateSchemaTest, RequireColumn) {
   EXPECT_FALSE(field_opt->get().optional());
 }
 
-// Test requiring an already required column (noop)
 TEST_F(UpdateSchemaTest, RequireColumnNoop) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges().AddRequiredColumn("id", int32());
@@ -929,7 +888,6 @@ TEST_F(UpdateSchemaTest, RequireColumnNoop) {
   EXPECT_FALSE(field_opt->get().optional());
 }
 
-// Test making an already optional column optional (noop)
 TEST_F(UpdateSchemaTest, MakeColumnOptionalNoop) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("id", int32());
@@ -946,7 +904,6 @@ TEST_F(UpdateSchemaTest, MakeColumnOptionalNoop) {
   EXPECT_TRUE(field_opt->get().optional());
 }
 
-// Test case insensitive require column
 TEST_F(UpdateSchemaTest, RequireColumnCaseInsensitive) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("ID", int32());
@@ -963,7 +920,6 @@ TEST_F(UpdateSchemaTest, RequireColumnCaseInsensitive) {
   EXPECT_FALSE(field_opt->get().optional());
 }
 
-// Test make column optional on missing column fails
 TEST_F(UpdateSchemaTest, MakeColumnOptionalMissingFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->MakeColumnOptional("non_existent");
@@ -973,7 +929,6 @@ TEST_F(UpdateSchemaTest, MakeColumnOptionalMissingFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
 }
 
-// Test require column on missing column fails
 TEST_F(UpdateSchemaTest, RequireColumnMissingFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AllowIncompatibleChanges().RequireColumn("non_existent");
@@ -983,7 +938,6 @@ TEST_F(UpdateSchemaTest, RequireColumnMissingFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
 }
 
-// Test make column optional on deleted column fails
 TEST_F(UpdateSchemaTest, MakeColumnOptionalDeletedFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges().AddRequiredColumn("col", int32());
@@ -998,7 +952,6 @@ TEST_F(UpdateSchemaTest, MakeColumnOptionalDeletedFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
 }
 
-// Test require column on deleted column fails
 TEST_F(UpdateSchemaTest, RequireColumnDeletedFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32());
@@ -1013,22 +966,18 @@ TEST_F(UpdateSchemaTest, RequireColumnDeletedFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
 }
 
-// Test update column doc (in same transaction as add)
 TEST_F(UpdateSchemaTest, UpdateColumnDoc) {
-  // Add a column and update its doc in the same transaction
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("col", int32(), "original doc");
   update->UpdateColumnDoc("col", "updated doc");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the doc was updated
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(field_opt->get().doc(), "updated doc");
 }
 
-// Test update column doc on missing column fails
 TEST_F(UpdateSchemaTest, UpdateColumnDocMissingFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->UpdateColumnDoc("non_existent", "some doc");
@@ -1038,7 +987,6 @@ TEST_F(UpdateSchemaTest, UpdateColumnDocMissingFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
 }
 
-// Test update column doc on deleted column fails
 TEST_F(UpdateSchemaTest, UpdateColumnDocDeletedFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32(), "original doc");
@@ -1053,7 +1001,6 @@ TEST_F(UpdateSchemaTest, UpdateColumnDocDeletedFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
 }
 
-// Test update column doc noop (same doc)
 TEST_F(UpdateSchemaTest, UpdateColumnDocNoop) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32(), "same doc");
@@ -1070,68 +1017,55 @@ TEST_F(UpdateSchemaTest, UpdateColumnDocNoop) {
   EXPECT_EQ(field_opt->get().doc(), "same doc");
 }
 
-// Test update column doc with empty string
 TEST_F(UpdateSchemaTest, UpdateColumnDocEmptyString) {
-  // Add a column with a doc and clear it in same transaction
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("col", int32(), "original doc");
   update->UpdateColumnDoc("col", "");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the doc was cleared
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(field_opt->get().doc(), "");
 }
 
-// Test update column int to long
 TEST_F(UpdateSchemaTest, UpdateColumnIntToLong) {
-  // Add an int column and update to long in same transaction
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("id", int32(), "An integer ID");
   update->UpdateColumn("id", int64());
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the type was updated
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(*field_opt->get().type(), *int64());
   EXPECT_EQ(field_opt->get().doc(), "An integer ID");  // Doc preserved
 }
 
-// Test update column float to double
 TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) {
-  // Add a float column and update to double in same transaction
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("value", float32(), "A float value");
   update->UpdateColumn("value", float64());
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the type was updated
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("value"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(*field_opt->get().type(), *float64());
 }
 
-// Test update column with same type (noop)
 TEST_F(UpdateSchemaTest, UpdateColumnSameType) {
-  // Add a column and update with same type (should be a noop)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("id", int32());
   update->UpdateColumn("id", int32());
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the type is still int32
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(*field_opt->get().type(), *int32());
 }
 
-// Test update column on missing column fails
 TEST_F(UpdateSchemaTest, UpdateColumnMissingFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->UpdateColumn("non_existent", int64());
@@ -1141,7 +1075,6 @@ TEST_F(UpdateSchemaTest, UpdateColumnMissingFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
 }
 
-// Test update column on deleted column fails
 TEST_F(UpdateSchemaTest, UpdateColumnDeletedFails) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32());
@@ -1156,9 +1089,7 @@ TEST_F(UpdateSchemaTest, UpdateColumnDeletedFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
 }
 
-// Test update column with invalid promotion fails (long to int)
 TEST_F(UpdateSchemaTest, UpdateColumnInvalidPromotionFails) {
-  // Add a long column and try to downgrade to int (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("id", int64());
   update->UpdateColumn("id", int32());
@@ -1168,9 +1099,7 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnInvalidPromotionFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
 }
 
-// Test update column with invalid promotion fails (double to float)
 TEST_F(UpdateSchemaTest, UpdateColumnInvalidPromotionDoubleToFloatFails) {
-  // Add a double column and try to downgrade to float (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("value", float64());
   update->UpdateColumn("value", float32());
@@ -1180,9 +1109,7 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnInvalidPromotionDoubleToFloatFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
 }
 
-// Test update column with incompatible types fails (int to string)
 TEST_F(UpdateSchemaTest, UpdateColumnIncompatibleTypesFails) {
-  // Add an int column and try to change to string (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("id", int32());
   update->UpdateColumn("id", string());
@@ -1192,7 +1119,6 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnIncompatibleTypesFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
 }
 
-// Test rename and update column type in same transaction - should fail
 TEST_F(UpdateSchemaTest, RenameAndUpdateColumnInSameTransaction) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("old_name", int32());
@@ -1204,9 +1130,7 @@ TEST_F(UpdateSchemaTest, 
RenameAndUpdateColumnInSameTransaction) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
 }
 
-// Test decimal precision widening
 TEST_F(UpdateSchemaTest, UpdateColumnDecimalPrecisionWidening) {
-  // Add a decimal column and widen the precision
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   auto decimal_10_2 = decimal(10, 2);
   auto decimal_20_2 = decimal(20, 2);
@@ -1215,15 +1139,12 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnDecimalPrecisionWidening) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the precision was widened
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("price"));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_EQ(*field_opt->get().type(), *decimal_20_2);
 }
 
-// Test decimal with different scale fails
 TEST_F(UpdateSchemaTest, UpdateColumnDecimalDifferentScaleFails) {
-  // Try to change decimal scale (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   auto decimal_10_2 = decimal(10, 2);
   auto decimal_10_3 = decimal(10, 3);
@@ -1235,9 +1156,7 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnDecimalDifferentScaleFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
 }
 
-// Test decimal precision narrowing fails
 TEST_F(UpdateSchemaTest, UpdateColumnDecimalPrecisionNarrowingFails) {
-  // Try to narrow decimal precision (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   auto decimal_20_2 = decimal(20, 2);
   auto decimal_10_2 = decimal(10, 2);
@@ -1249,18 +1168,14 @@ TEST_F(UpdateSchemaTest, 
UpdateColumnDecimalPrecisionNarrowingFails) {
   EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
 }
 
-// Test update type preserves other metadata (doc, optional)
 TEST_F(UpdateSchemaTest, UpdateTypePreservesOtherMetadata) {
-  // Add a column with metadata
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("value", int32(), "A counter value");
 
-  // Update type should preserve doc and optional
   update->UpdateColumn("value", int64());
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the type was updated but other metadata preserved
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("value"));
   ASSERT_TRUE(field_opt.has_value());
 
@@ -1270,18 +1185,14 @@ TEST_F(UpdateSchemaTest, 
UpdateTypePreservesOtherMetadata) {
   EXPECT_TRUE(field.optional());              // Optional preserved
 }
 
-// Test update doc preserves other metadata (type, optional)
 TEST_F(UpdateSchemaTest, UpdateDocPreservesOtherMetadata) {
-  // Add a required column with a type
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AllowIncompatibleChanges().AddRequiredColumn("id", int64(), "old 
doc");
 
-  // Update doc should preserve type and optional
   update->UpdateColumnDoc("id", "new doc");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the doc was updated but other metadata preserved
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
   ASSERT_TRUE(field_opt.has_value());
 
@@ -1291,7 +1202,6 @@ TEST_F(UpdateSchemaTest, UpdateDocPreservesOtherMetadata) 
{
   EXPECT_FALSE(field.optional());      // Optional preserved (still required)
 }
 
-// Test rename-delete conflict
 TEST_F(UpdateSchemaTest, RenameDeleteConflict) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32());
@@ -1306,7 +1216,6 @@ TEST_F(UpdateSchemaTest, RenameDeleteConflict) {
   EXPECT_THAT(result, HasErrorMessage("Cannot delete a column that has 
updates"));
 }
 
-// Test delete-rename conflict
 TEST_F(UpdateSchemaTest, DeleteRenameConflict) {
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("col", int32());
@@ -1321,7 +1230,6 @@ TEST_F(UpdateSchemaTest, DeleteRenameConflict) {
   EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
 }
 
-// Test case insensitive add then update
 TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdate) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(false)
@@ -1335,7 +1243,6 @@ TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdate) {
   EXPECT_EQ(*field_opt->get().type(), *int64());
 }
 
-// Test case insensitive add then update doc
 TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdateDoc) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(false)
@@ -1349,7 +1256,6 @@ TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdateDoc) 
{
   EXPECT_EQ(field_opt->get().doc(), "updated doc");
 }
 
-// Test case insensitive add then make optional
 TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenMakeOptional) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(false)
@@ -1364,7 +1270,6 @@ TEST_F(UpdateSchemaTest, 
CaseInsensitiveAddThenMakeOptional) {
   EXPECT_TRUE(field_opt->get().optional());
 }
 
-// Test case insensitive add then require
 TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenRequire) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->CaseSensitive(false)
@@ -1374,21 +1279,16 @@ TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenRequire) 
{
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the column is now required
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("Foo", 
false));
   ASSERT_TRUE(field_opt.has_value());
   EXPECT_FALSE(field_opt->get().optional());
 }
 
-// Test mixed changes - comprehensive test combining multiple operations
 TEST_F(UpdateSchemaTest, MixedChanges) {
-  // First, create a complex schema similar to Java's SCHEMA constant
-  // Build the "preferences" struct
   auto preferences_struct = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(8, "feature1", boolean(), false),
                                SchemaField(9, "feature2", boolean(), true)});
 
-  // Build the "locations" map (address struct -> coordinate struct)
   auto address_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(20, "address", string(), false),
       SchemaField(21, "city", string(), false), SchemaField(22, "state", 
string(), false),
@@ -1402,22 +1302,18 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
       std::make_shared<MapType>(SchemaField(10, "key", address_struct, false),
                                 SchemaField(11, "value", coordinate_struct, 
false));
 
-  // Build the "points" list
   auto point_struct = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(15, "x", int64(), false), SchemaField(16, "y", int64(), 
false)});
 
   auto points_list =
       std::make_shared<ListType>(SchemaField(14, "element", point_struct, 
true));
 
-  // Build the "doubles" list
   auto doubles_list =
       std::make_shared<ListType>(SchemaField(17, "element", float64(), false));
 
-  // Build the "properties" map
   auto properties_map = std::make_shared<MapType>(
       SchemaField(18, "key", string(), true), SchemaField(19, "value", 
string(), true));
 
-  // Create the initial schema
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges()
       .AddRequiredColumn("id", int32())
@@ -1429,7 +1325,6 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
       .AddColumn("properties", properties_map, "string map of properties");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Now perform the mixed changes in a single transaction
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
 
@@ -1456,14 +1351,12 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify id: int -> long, with doc "unique id"
   ICEBERG_UNWRAP_OR_FAIL(auto id_opt, result.schema->FindFieldByName("id"));
   ASSERT_TRUE(id_opt.has_value());
   EXPECT_EQ(*id_opt->get().type(), *int64());
   EXPECT_EQ(id_opt->get().doc(), "unique id");
   EXPECT_FALSE(id_opt->get().optional());  // was required
 
-  // Verify data was renamed to json and is now required
   ICEBERG_UNWRAP_OR_FAIL(auto data_opt, 
result.schema->FindFieldByName("data"));
   ICEBERG_UNWRAP_OR_FAIL(auto json_opt, 
result.schema->FindFieldByName("json"));
   EXPECT_FALSE(data_opt.has_value());
@@ -1471,7 +1364,6 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   EXPECT_EQ(*json_opt->get().type(), *string());
   EXPECT_FALSE(json_opt->get().optional());  // now required
 
-  // Verify preferences was renamed to options, feature2 renamed to newfeature
   ICEBERG_UNWRAP_OR_FAIL(auto preferences_opt,
                          result.schema->FindFieldByName("preferences"));
   ICEBERG_UNWRAP_OR_FAIL(auto options_opt, 
result.schema->FindFieldByName("options"));
@@ -1479,7 +1371,8 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   ASSERT_TRUE(options_opt.has_value());
   EXPECT_EQ(options_opt->get().doc(), "struct of named boolean options");
 
-  const auto& options_struct = static_cast<const 
StructType&>(*options_opt->get().type());
+  const auto& options_struct =
+      checked_cast<const StructType&>(*options_opt->get().type());
   ICEBERG_UNWRAP_OR_FAIL(auto feature1_opt, 
options_struct.GetFieldByName("feature1"));
   ICEBERG_UNWRAP_OR_FAIL(auto feature2_opt, 
options_struct.GetFieldByName("feature2"));
   ICEBERG_UNWRAP_OR_FAIL(auto newfeature_opt,
@@ -1489,18 +1382,16 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   ASSERT_TRUE(newfeature_opt.has_value());
   EXPECT_EQ(*newfeature_opt->get().type(), *boolean());
 
-  // Verify locations map changes
   ICEBERG_UNWRAP_OR_FAIL(auto locations_opt, 
result.schema->FindFieldByName("locations"));
   ASSERT_TRUE(locations_opt.has_value());
   EXPECT_EQ(locations_opt->get().doc(), "map of address to coordinate");
   EXPECT_FALSE(locations_opt->get().optional());  // was required
 
   const auto& locations_map_type =
-      static_cast<const MapType&>(*locations_opt->get().type());
+      checked_cast<const MapType&>(*locations_opt->get().type());
   const auto& coord_struct =
-      static_cast<const StructType&>(*locations_map_type.value().type());
+      checked_cast<const StructType&>(*locations_map_type.value().type());
 
-  // lat renamed to latitude, type changed to double, doc added
   ICEBERG_UNWRAP_OR_FAIL(auto lat_opt, coord_struct.GetFieldByName("lat"));
   ICEBERG_UNWRAP_OR_FAIL(auto latitude_opt, 
coord_struct.GetFieldByName("latitude"));
   EXPECT_FALSE(lat_opt.has_value());
@@ -1508,16 +1399,13 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   EXPECT_EQ(*latitude_opt->get().type(), *float64());
   EXPECT_EQ(latitude_opt->get().doc(), "latitude");
 
-  // long was deleted
   ICEBERG_UNWRAP_OR_FAIL(auto long_opt, coord_struct.GetFieldByName("long"));
   EXPECT_FALSE(long_opt.has_value());
 
-  // alt was added
   ICEBERG_UNWRAP_OR_FAIL(auto alt_opt, coord_struct.GetFieldByName("alt"));
   ASSERT_TRUE(alt_opt.has_value());
   EXPECT_EQ(*alt_opt->get().type(), *float32());
 
-  // description was added as required
   ICEBERG_UNWRAP_OR_FAIL(auto description_opt,
                          coord_struct.GetFieldByName("description"));
   ASSERT_TRUE(description_opt.has_value());
@@ -1525,16 +1413,14 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   EXPECT_EQ(description_opt->get().doc(), "Location description");
   EXPECT_FALSE(description_opt->get().optional());
 
-  // Verify points list changes
   ICEBERG_UNWRAP_OR_FAIL(auto points_opt, 
result.schema->FindFieldByName("points"));
   ASSERT_TRUE(points_opt.has_value());
   EXPECT_EQ(points_opt->get().doc(), "2-D cartesian points");
 
-  const auto& points_list_type = static_cast<const 
ListType&>(*points_opt->get().type());
+  const auto& points_list_type = checked_cast<const 
ListType&>(*points_opt->get().type());
   const auto& point_elem_struct =
-      static_cast<const StructType&>(*points_list_type.element().type());
+      checked_cast<const StructType&>(*points_list_type.element().type());
 
-  // x renamed to X and made optional
   ICEBERG_UNWRAP_OR_FAIL(auto x_opt, point_elem_struct.GetFieldByName("x"));
   ICEBERG_UNWRAP_OR_FAIL(auto X_opt, point_elem_struct.GetFieldByName("X"));
   EXPECT_FALSE(x_opt.has_value());
@@ -1542,39 +1428,800 @@ TEST_F(UpdateSchemaTest, MixedChanges) {
   EXPECT_EQ(*X_opt->get().type(), *int64());
   EXPECT_TRUE(X_opt->get().optional());  // made optional
 
-  // y renamed to y.y
   ICEBERG_UNWRAP_OR_FAIL(auto y_opt, point_elem_struct.GetFieldByName("y"));
   ICEBERG_UNWRAP_OR_FAIL(auto yy_opt, point_elem_struct.GetFieldByName("y.y"));
   EXPECT_FALSE(y_opt.has_value());
   ASSERT_TRUE(yy_opt.has_value());
   EXPECT_EQ(*yy_opt->get().type(), *int64());
 
-  // z was added
   ICEBERG_UNWRAP_OR_FAIL(auto z_opt, point_elem_struct.GetFieldByName("z"));
   ASSERT_TRUE(z_opt.has_value());
   EXPECT_EQ(*z_opt->get().type(), *int64());
 
-  // t.t was added with doc
   ICEBERG_UNWRAP_OR_FAIL(auto tt_opt, point_elem_struct.GetFieldByName("t.t"));
   ASSERT_TRUE(tt_opt.has_value());
   EXPECT_EQ(*tt_opt->get().type(), *int64());
   EXPECT_EQ(tt_opt->get().doc(), "name with '.'");
 
-  // Verify doubles list unchanged
   ICEBERG_UNWRAP_OR_FAIL(auto doubles_opt, 
result.schema->FindFieldByName("doubles"));
   ASSERT_TRUE(doubles_opt.has_value());
   EXPECT_EQ(doubles_opt->get().type()->type_id(), TypeId::kList);
 
-  // Verify properties was deleted
   ICEBERG_UNWRAP_OR_FAIL(auto properties_opt,
                          result.schema->FindFieldByName("properties"));
   EXPECT_FALSE(properties_opt.has_value());
 
-  // Verify toplevel was added
   ICEBERG_UNWRAP_OR_FAIL(auto toplevel_opt, 
result.schema->FindFieldByName("toplevel"));
   ASSERT_TRUE(toplevel_opt.has_value());
   EXPECT_EQ(*toplevel_opt->get().type(), *decimal(9, 2));
   EXPECT_TRUE(toplevel_opt->get().optional());
 }
 
+// ============================================================================
+// Move Operations Tests
+// ============================================================================
+
+TEST_F(UpdateSchemaTest, TestMultipleMoves) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  setup->AddColumn("w", int64());
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+
+  update->MoveFirst("w").MoveFirst("z").MoveAfter("y", "w").MoveBefore("w", 
"x");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  EXPECT_EQ(fields[0].name(), "z");
+  EXPECT_EQ(fields[1].name(), "y");
+
+  int w_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "w") w_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GT(w_pos, 0);
+  EXPECT_GT(x_pos, 0);
+  EXPECT_LT(w_pos, x_pos);  // w should come before x
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelColumnFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->MoveFirst("y");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  EXPECT_EQ(fields[0].name(), "y");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelColumnBeforeFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->MoveBefore("y", "x");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int y_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "y") y_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GE(y_pos, 0);
+  EXPECT_GE(x_pos, 0);
+  EXPECT_LT(y_pos, x_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelColumnAfterLast) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->MoveAfter("x", "z");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  EXPECT_EQ(fields[fields.size() - 1].name(), "x");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelColumnAfter) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  setup->AddColumn("w", timestamp_tz());
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveAfter("w", "x");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int w_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "w") w_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GE(w_pos, 0);
+  EXPECT_GE(x_pos, 0);
+  EXPECT_EQ(w_pos, x_pos + 1);  // w should be immediately after x
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelColumnBefore) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  setup->AddColumn("w", timestamp_tz());
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("w", "z");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int w_pos = -1;
+  int z_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "w") w_pos = i;
+    if (fields[i].name() == "z") z_pos = i;
+  }
+  EXPECT_GE(w_pos, 0);
+  EXPECT_GE(z_pos, 0);
+  EXPECT_LT(w_pos, z_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveNestedFieldFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(100, "a", int64(), true), SchemaField(101, "b", int64(), 
true)});
+  setup->AddColumn("s", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveFirst("s.b");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto s_opt, result.schema->FindFieldByName("s"));
+  ASSERT_TRUE(s_opt.has_value());
+  const auto& s_struct = checked_cast<const StructType&>(*s_opt->get().type());
+
+  const auto& fields = s_struct.fields();
+  EXPECT_EQ(fields[0].name(), "b");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveNestedFieldBeforeFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(100, "a", int64(), true), SchemaField(101, "b", int64(), 
true)});
+  setup->AddColumn("s", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("s.b", "s.a");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto s_opt, result.schema->FindFieldByName("s"));
+  ASSERT_TRUE(s_opt.has_value());
+  const auto& s_struct = checked_cast<const StructType&>(*s_opt->get().type());
+
+  const auto& fields = s_struct.fields();
+  int a_pos = -1;
+  int b_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "a") a_pos = i;
+    if (fields[i].name() == "b") b_pos = i;
+  }
+  EXPECT_GE(a_pos, 0);
+  EXPECT_GE(b_pos, 0);
+  EXPECT_LT(b_pos, a_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveNestedFieldAfterLast) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(100, "a", int64(), true), SchemaField(101, "b", int64(), 
true)});
+  setup->AddColumn("s", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveAfter("s.a", "s.b");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto s_opt, result.schema->FindFieldByName("s"));
+  ASSERT_TRUE(s_opt.has_value());
+  const auto& s_struct = checked_cast<const StructType&>(*s_opt->get().type());
+
+  const auto& fields = s_struct.fields();
+  EXPECT_EQ(fields[fields.size() - 1].name(), "a");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveNestedFieldAfter) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(100, "a", int64(), true), SchemaField(101, "b", int64(), 
true),
+      SchemaField(102, "c", timestamp_tz(), false)});
+  setup->AddColumn("s", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveAfter("s.c", "s.a");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto s_opt, result.schema->FindFieldByName("s"));
+  ASSERT_TRUE(s_opt.has_value());
+  const auto& s_struct = checked_cast<const StructType&>(*s_opt->get().type());
+
+  const auto& fields = s_struct.fields();
+  int a_pos = -1;
+  int c_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "a") a_pos = i;
+    if (fields[i].name() == "c") c_pos = i;
+  }
+  EXPECT_GE(a_pos, 0);
+  EXPECT_GE(c_pos, 0);
+  EXPECT_EQ(c_pos, a_pos + 1);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveNestedFieldBefore) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(102, "c", timestamp_tz(), false), SchemaField(100, "a", 
int64(), true),
+      SchemaField(101, "b", int64(), true)});
+  setup->AddColumn("s", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("s.c", "s.b");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto s_opt, result.schema->FindFieldByName("s"));
+  ASSERT_TRUE(s_opt.has_value());
+  const auto& s_struct = checked_cast<const StructType&>(*s_opt->get().type());
+
+  const auto& fields = s_struct.fields();
+  int b_pos = -1;
+  int c_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "b") b_pos = i;
+    if (fields[i].name() == "c") c_pos = i;
+  }
+  EXPECT_GE(b_pos, 0);
+  EXPECT_GE(c_pos, 0);
+  EXPECT_LT(c_pos, b_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveListElementField) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto elem_struct = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(100, "a", int64(), true), SchemaField(101, "b", int64(), 
true)});
+  auto list_type =
+      std::make_shared<ListType>(SchemaField(99, "element", elem_struct, 
false));
+  setup->AddColumn("list", list_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveAfter("list.a", "list.b");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto list_opt, 
result.schema->FindFieldByName("list"));
+  ASSERT_TRUE(list_opt.has_value());
+  const auto& list_type_result = checked_cast<const 
ListType&>(*list_opt->get().type());
+  const auto& elem_struct_result =
+      checked_cast<const StructType&>(*list_type_result.element().type());
+
+  const auto& fields = elem_struct_result.fields();
+  int a_pos = -1;
+  int b_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "a") a_pos = i;
+    if (fields[i].name() == "b") b_pos = i;
+  }
+  EXPECT_GE(a_pos, 0);
+  EXPECT_GE(b_pos, 0);
+  EXPECT_GT(a_pos, b_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveMapValueStructField) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto value_struct = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "lat", float64(), true),
+                               SchemaField(101, "long", float64(), true)});
+  auto map_type =
+      std::make_shared<MapType>(SchemaField(98, "key", string(), false),
+                                SchemaField(99, "value", value_struct, false));
+  setup->AddColumn("locations", map_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveAfter("locations.lat", "locations.long");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto locs_opt, 
result.schema->FindFieldByName("locations"));
+  ASSERT_TRUE(locs_opt.has_value());
+  const auto& locs_map = checked_cast<const MapType&>(*locs_opt->get().type());
+  const auto& value_struct_result =
+      checked_cast<const StructType&>(*locs_map.value().type());
+
+  const auto& fields = value_struct_result.fields();
+  int lat_pos = -1;
+  int long_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "lat") lat_pos = i;
+    if (fields[i].name() == "long") long_pos = i;
+  }
+  EXPECT_GE(lat_pos, 0);
+  EXPECT_GE(long_pos, 0);
+  EXPECT_GT(lat_pos, long_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveAddedTopLevelColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("ts", timestamp_tz()).MoveAfter("ts", "x");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int ts_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "ts") ts_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GE(ts_pos, 0);
+  EXPECT_GE(x_pos, 0);
+  EXPECT_EQ(ts_pos, x_pos + 1);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveAddedTopLevelColumnAfterAddedColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("ts", timestamp_tz())
+      .AddColumn("count", int64())
+      .MoveAfter("ts", "x")
+      .MoveAfter("count", "ts");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int x_pos = -1;
+  int ts_pos = -1;
+  int count_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "x") x_pos = i;
+    if (fields[i].name() == "ts") ts_pos = i;
+    if (fields[i].name() == "count") count_pos = i;
+  }
+  EXPECT_GE(x_pos, 0);
+  EXPECT_GE(ts_pos, 0);
+  EXPECT_GE(count_pos, 0);
+  EXPECT_EQ(ts_pos, x_pos + 1);
+  EXPECT_EQ(count_pos, ts_pos + 1);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveAddedNestedStructField) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true),
+                               SchemaField(101, "feature2", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->AddColumn("preferences", "ts", timestamp_tz())
+      .MoveBefore("preferences.ts", "preferences.feature1");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[0].name(), "ts");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveAddedNestedStructFieldBeforeAddedColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true),
+                               SchemaField(101, "feature2", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->AddColumn("preferences", "ts", timestamp_tz())
+      .AddColumn("preferences", "size", int64())
+      .MoveBefore("preferences.ts", "preferences.feature1")
+      .MoveBefore("preferences.size", "preferences.ts");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[0].name(), "size");
+  EXPECT_EQ(fields[1].name(), "ts");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveSelfReferenceFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateSchema());
+  update1->MoveBefore("x", "x");
+  auto result1 = update1->Apply();
+  EXPECT_THAT(result1, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result1, HasErrorMessage("Cannot move x before itself"));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, table_->NewUpdateSchema());
+  update2->MoveAfter("x", "x");
+  auto result2 = update2->Apply();
+  EXPECT_THAT(result2, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result2, HasErrorMessage("Cannot move x after itself"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveMissingColumnFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateSchema());
+  update1->MoveFirst("items");
+  auto result1 = update1->Apply();
+  EXPECT_THAT(result1, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result1, HasErrorMessage("Cannot move missing column: items"));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, table_->NewUpdateSchema());
+  update2->MoveBefore("items", "x");
+  auto result2 = update2->Apply();
+  EXPECT_THAT(result2, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result2, HasErrorMessage("Cannot move missing column: items"));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update3, table_->NewUpdateSchema());
+  update3->MoveAfter("items", "y");
+  auto result3 = update3->Apply();
+  EXPECT_THAT(result3, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result3, HasErrorMessage("Cannot move missing column: items"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveBeforeAddFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->MoveBefore("ts", "x").AddColumn("ts", timestamp_tz());
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot move missing column: ts"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveMissingReferenceColumnFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateSchema());
+  update1->MoveBefore("x", "items");
+  auto result1 = update1->Apply();
+  EXPECT_THAT(result1, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result1, HasErrorMessage("Cannot move x before missing column: 
items"));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, table_->NewUpdateSchema());
+  update2->MoveAfter("y", "items");
+  auto result2 = update2->Apply();
+  EXPECT_THAT(result2, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result2, HasErrorMessage("Cannot move y after missing column: 
items"));
+}
+
+TEST_F(UpdateSchemaTest, TestMovePrimitiveMapKeyFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto map_type = std::make_shared<MapType>(SchemaField(98, "key", string(), 
false),
+                                            SchemaField(99, "value", string(), 
false));
+
+  setup->AllowIncompatibleChanges().AddRequiredColumn("properties", map_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("properties.key", "properties.value");
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot move fields in non-struct 
type"));
+}
+
+TEST_F(UpdateSchemaTest, TestMovePrimitiveMapValueFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto map_type = std::make_shared<MapType>(SchemaField(98, "key", string(), 
false),
+                                            SchemaField(99, "value", string(), 
false));
+
+  setup->AllowIncompatibleChanges().AddRequiredColumn("properties", map_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("properties.value", "properties.key");
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot move fields in non-struct 
type"));
+}
+
+TEST_F(UpdateSchemaTest, TestMovePrimitiveListElementFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto list_type =
+      std::make_shared<ListType>(SchemaField(98, "element", float64(), false));
+
+  setup->AllowIncompatibleChanges().AddRequiredColumn("doubles", list_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("doubles.element", "doubles");
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot move fields in non-struct 
type"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopLevelBetweenStructsFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("x", "preferences.feature1");
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot move field x to a different 
struct"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveBetweenStructsFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct1 = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "a", int64(), true)});
+  auto struct2 = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(101, "feature1", string(), true)});
+  setup->AddColumn("points", struct1);
+  setup->AddColumn("preferences", struct2);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MoveBefore("points.a", "preferences.feature1");
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result,
+              HasErrorMessage("Cannot move field points.a to a different 
struct"));
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopDeletedColumnAfterAnotherColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("z")
+      .AddRequiredColumn("z", int32())
+      .MoveAfter("z", "y");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int z_pos = -1;
+  int y_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "z") z_pos = i;
+    if (fields[i].name() == "y") y_pos = i;
+  }
+  EXPECT_GE(z_pos, 0);
+  EXPECT_GE(y_pos, 0);
+  EXPECT_GT(z_pos, y_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopDeletedColumnBeforeAnotherColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("z")
+      .AddRequiredColumn("z", int32())
+      .MoveBefore("z", "x");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int z_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "z") z_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GE(z_pos, 0);
+  EXPECT_GE(x_pos, 0);
+  EXPECT_LT(z_pos, x_pos);
+}
+
+TEST_F(UpdateSchemaTest, TestMoveTopDeletedColumnToFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("z")
+      .AddRequiredColumn("z", int32())
+      .MoveFirst("z");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  EXPECT_EQ(fields[0].name(), "z");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveDeletedNestedStructFieldAfterAnotherColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true),
+                               SchemaField(101, "feature2", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("preferences.feature1")
+      .AddRequiredColumn("preferences", "feature1", boolean())
+      .MoveAfter("preferences.feature1", "preferences.feature2");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[fields.size() - 1].name(), "feature1");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveDeletedNestedStructFieldBeforeAnotherColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true),
+                               SchemaField(101, "feature2", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("preferences.feature2")
+      .AddColumn("preferences", "feature2", boolean())
+      .MoveBefore("preferences.feature2", "preferences.feature1");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[0].name(), "feature2");
+}
+
+TEST_F(UpdateSchemaTest, TestMoveDeletedNestedStructFieldToFirst) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "feature1", string(), true),
+                               SchemaField(101, "feature2", string(), true)});
+  setup->AddColumn("preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->AllowIncompatibleChanges()
+      .DeleteColumn("preferences.feature2")
+      .AddColumn("preferences", "feature2", boolean())
+      .MoveFirst("preferences.feature2");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[0].name(), "feature2");
+}
+
+TEST_F(UpdateSchemaTest, TestCaseInsensitiveAddTopLevelAndMove) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false).AddColumn("TS", timestamp_tz()).MoveAfter("ts", 
"X");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int ts_pos = -1;
+  int x_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "TS") ts_pos = i;
+    if (fields[i].name() == "x") x_pos = i;
+  }
+  EXPECT_GE(ts_pos, 0);
+  EXPECT_GE(x_pos, 0);
+  EXPECT_EQ(ts_pos, x_pos + 1);
+}
+
+TEST_F(UpdateSchemaTest, TestCaseInsensitiveAddNestedAndMove) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup, table_->NewUpdateSchema());
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "Feature1", string(), true)});
+  setup->AddColumn("Preferences", struct_type);
+  EXPECT_THAT(setup->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AddColumn("Preferences", "TS", timestamp_tz())
+      .MoveBefore("preferences.ts", "PREFERENCES.Feature1");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto prefs_opt, 
result.schema->FindFieldByName("Preferences"));
+  ASSERT_TRUE(prefs_opt.has_value());
+  const auto& prefs_struct = checked_cast<const 
StructType&>(*prefs_opt->get().type());
+
+  const auto& fields = prefs_struct.fields();
+  EXPECT_EQ(fields[0].name(), "TS");
+}
+
+TEST_F(UpdateSchemaTest, TestCaseInsensitiveMoveAfterNewlyAddedField) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AddColumn("TS", timestamp_tz())
+      .AddColumn("Count", int64())
+      .MoveAfter("ts", "X")
+      .MoveAfter("count", "TS");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  ASSERT_TRUE(result.schema != nullptr);
+
+  const auto& fields = result.schema->fields();
+  int x_pos = -1;
+  int ts_pos = -1;
+  int count_pos = -1;
+  for (size_t i = 0; i < fields.size(); ++i) {
+    if (fields[i].name() == "x") x_pos = i;
+    if (fields[i].name() == "TS") ts_pos = i;
+    if (fields[i].name() == "Count") count_pos = i;
+  }
+  EXPECT_GE(x_pos, 0);
+  EXPECT_GE(ts_pos, 0);
+  EXPECT_GE(count_pos, 0);
+  EXPECT_EQ(ts_pos, x_pos + 1);
+  EXPECT_EQ(count_pos, ts_pos + 1);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/update_schema.cc 
b/src/iceberg/update/update_schema.cc
index e339ef04..a4c45349 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -52,8 +52,12 @@ class ApplyChangesVisitor {
   ApplyChangesVisitor(
       const std::unordered_set<int32_t>& deletes,
       const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
-      const std::unordered_map<int32_t, std::vector<int32_t>>& 
parent_to_added_ids)
-      : deletes_(deletes), updates_(updates), 
parent_to_added_ids_(parent_to_added_ids) {}
+      const std::unordered_map<int32_t, std::vector<int32_t>>& 
parent_to_added_ids,
+      const std::unordered_map<int32_t, std::vector<UpdateSchema::Move>>& 
moves)
+      : deletes_(deletes),
+        updates_(updates),
+        parent_to_added_ids_(parent_to_added_ids),
+        moves_(moves) {}
 
   /// \brief Apply changes to a type using schema visitor pattern
   Result<std::shared_ptr<Type>> ApplyChanges(const std::shared_ptr<Type>& type,
@@ -98,6 +102,12 @@ class ApplyChangesVisitor {
       }
     }
 
+    auto moves_it = moves_.find(parent_id);
+    if (moves_it != moves_.end() && !moves_it->second.empty()) {
+      has_changes = true;
+      new_fields = MoveFields(std::move(new_fields), moves_it->second);
+    }
+
     if (!has_changes) {
       return base_type;
     }
@@ -205,11 +215,64 @@ class ApplyChangesVisitor {
     }
   }
 
+  /// \brief Helper function to apply move operations to a list of fields
+  static std::vector<SchemaField> MoveFields(
+      std::vector<SchemaField>&& fields, const 
std::vector<UpdateSchema::Move>& moves);
+
   const std::unordered_set<int32_t>& deletes_;
   const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates_;
   const std::unordered_map<int32_t, std::vector<int32_t>>& 
parent_to_added_ids_;
+  const std::unordered_map<int32_t, std::vector<UpdateSchema::Move>>& moves_;
 };
 
+std::vector<SchemaField> ApplyChangesVisitor::MoveFields(
+    std::vector<SchemaField>&& fields, const std::vector<UpdateSchema::Move>& 
moves) {
+  std::vector<SchemaField> reordered = std::move(fields);
+
+  for (const auto& move : moves) {
+    auto it = std::ranges::find_if(reordered, [&move](const SchemaField& 
field) {
+      return field.field_id() == move.field_id;
+    });
+
+    if (it == reordered.end()) {
+      continue;
+    }
+
+    SchemaField to_move = *it;
+    reordered.erase(it);
+
+    switch (move.type) {
+      case UpdateSchema::Move::MoveType::kFirst:
+        reordered.insert(reordered.begin(), std::move(to_move));
+        break;
+
+      case UpdateSchema::Move::MoveType::kBefore: {
+        auto before_it =
+            std::ranges::find_if(reordered, [&move](const SchemaField& field) {
+              return field.field_id() == move.reference_field_id;
+            });
+        if (before_it != reordered.end()) {
+          reordered.insert(before_it, std::move(to_move));
+        }
+        break;
+      }
+
+      case UpdateSchema::Move::MoveType::kAfter: {
+        auto after_it =
+            std::ranges::find_if(reordered, [&move](const SchemaField& field) {
+              return field.field_id() == move.reference_field_id;
+            });
+        if (after_it != reordered.end()) {
+          reordered.insert(after_it + 1, std::move(to_move));
+        }
+        break;
+      }
+    }
+  }
+
+  return reordered;
+}
+
 }  // namespace
 
 Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
@@ -244,6 +307,25 @@ UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> 
transaction)
 
 UpdateSchema::~UpdateSchema() = default;
 
+UpdateSchema::Move UpdateSchema::Move::First(int32_t field_id) {
+  return Move{
+      .field_id = field_id, .reference_field_id = kTableRootId, .type = 
MoveType::kFirst};
+}
+
+UpdateSchema::Move UpdateSchema::Move::Before(int32_t field_id,
+                                              int32_t reference_field_id) {
+  return Move{.field_id = field_id,
+              .reference_field_id = reference_field_id,
+              .type = MoveType::kBefore};
+}
+
+UpdateSchema::Move UpdateSchema::Move::After(int32_t field_id,
+                                             int32_t reference_field_id) {
+  return Move{.field_id = field_id,
+              .reference_field_id = reference_field_id,
+              .type = MoveType::kAfter};
+}
+
 UpdateSchema& UpdateSchema::AllowIncompatibleChanges() {
   allow_incompatible_changes_ = true;
   return *this;
@@ -421,23 +503,33 @@ UpdateSchema& UpdateSchema::DeleteColumn(std::string_view 
name) {
 }
 
 UpdateSchema& UpdateSchema::MoveFirst(std::string_view name) {
-  // TODO(Guotao Yu): Implement MoveFirst
-  AddError(NotImplemented("UpdateSchema::MoveFirst not implemented"));
-  return *this;
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
+
+  return MoveInternal(name, Move::First(field_id));
 }
 
 UpdateSchema& UpdateSchema::MoveBefore(std::string_view name,
                                        std::string_view before_name) {
-  // TODO(Guotao Yu): Implement MoveBefore
-  AddError(NotImplemented("UpdateSchema::MoveBefore not implemented"));
-  return *this;
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR(
+      auto before_id, FindFieldIdForMove(before_name),
+      "Cannot move {} before missing column: {}", name, before_name);
+
+  ICEBERG_BUILDER_CHECK(field_id != before_id, "Cannot move {} before itself", 
name);
+
+  return MoveInternal(name, Move::Before(field_id, before_id));
 }
 
 UpdateSchema& UpdateSchema::MoveAfter(std::string_view name,
                                       std::string_view after_name) {
-  // TODO(Guotao Yu): Implement MoveAfter
-  AddError(NotImplemented("UpdateSchema::MoveAfter not implemented"));
-  return *this;
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR(
+      auto after_id, FindFieldIdForMove(after_name),
+      "Cannot move {} after missing column: {}", name, after_name);
+
+  ICEBERG_BUILDER_CHECK(field_id != after_id, "Cannot move {} after itself", 
name);
+
+  return MoveInternal(name, Move::After(field_id, after_id));
 }
 
 UpdateSchema& UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> 
new_schema) {
@@ -478,7 +570,7 @@ Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
     }
   }
 
-  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_, 
moves_);
   ICEBERG_ASSIGN_OR_RAISE(auto new_type, visitor.ApplyChanges(schema_, 
kTableRootId));
 
   auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
@@ -629,4 +721,53 @@ std::string 
UpdateSchema::CaseSensitivityAwareName(std::string_view name) const
   return StringUtils::ToLower(name);
 }
 
+Result<int32_t> UpdateSchema::FindFieldIdForMove(std::string_view name) const {
+  auto added_it = added_name_to_id_.find(CaseSensitivityAwareName(name));
+  if (added_it != added_name_to_id_.end()) {
+    return added_it->second;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto field, FindField(name));
+  if (field.has_value()) {
+    return field->get().field_id();
+  }
+
+  return InvalidArgument("Cannot move missing column: {}", name);
+}
+
+UpdateSchema& UpdateSchema::MoveInternal(std::string_view name, const Move& 
move) {
+  auto parent_it = id_to_parent_.find(move.field_id);
+
+  if (parent_it != id_to_parent_.end()) {
+    int32_t parent_id = parent_it->second;
+    auto parent_field_result = schema_->FindFieldById(parent_id);
+
+    ICEBERG_BUILDER_CHECK(parent_field_result.has_value(),
+                          "Cannot find parent field with id: {}", parent_id);
+
+    const auto& parent_field = parent_field_result.value()->get();
+    ICEBERG_BUILDER_CHECK(parent_field.type()->type_id() == TypeId::kStruct,
+                          "Cannot move fields in non-struct type");
+
+    if (move.type == Move::MoveType::kBefore || move.type == 
Move::MoveType::kAfter) {
+      auto ref_parent_it = id_to_parent_.find(move.reference_field_id);
+      ICEBERG_BUILDER_CHECK(
+          ref_parent_it != id_to_parent_.end() && ref_parent_it->second == 
parent_id,
+          "Cannot move field {} to a different struct", name);
+    }
+
+    moves_[parent_id].push_back(move);
+  } else {
+    if (move.type == Move::MoveType::kBefore || move.type == 
Move::MoveType::kAfter) {
+      auto ref_parent_it = id_to_parent_.find(move.reference_field_id);
+      ICEBERG_BUILDER_CHECK(ref_parent_it == id_to_parent_.end(),
+                            "Cannot move field {} to a different struct", 
name);
+    }
+
+    moves_[kTableRootId].push_back(move);
+  }
+
+  return *this;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/update_schema.h 
b/src/iceberg/update/update_schema.h
index e87c9bcc..a1c3e92d 100644
--- a/src/iceberg/update/update_schema.h
+++ b/src/iceberg/update/update_schema.h
@@ -317,6 +317,21 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
   /// \return Reference to this for method chaining.
   UpdateSchema& CaseSensitive(bool case_sensitive);
 
+  /// \brief Represents a column move operation within a struct (internal use 
only).
+  struct Move {
+    enum class MoveType { kFirst, kBefore, kAfter };
+
+    int32_t field_id;
+    int32_t reference_field_id;  // Only used for kBefore and kAfter
+    MoveType type;
+
+    static Move First(int32_t field_id);
+
+    static Move Before(int32_t field_id, int32_t reference_field_id);
+
+    static Move After(int32_t field_id, int32_t reference_field_id);
+  };
+
   Kind kind() const final { return Kind::kUpdateSchema; }
 
   struct ApplyResult {
@@ -380,6 +395,12 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
   /// \return The normalized field name.
   std::string CaseSensitivityAwareName(std::string_view name) const;
 
+  /// \brief Find a field ID for move operations.
+  Result<int32_t> FindFieldIdForMove(std::string_view name) const;
+
+  /// \brief Internal implementation for recording a move operation.
+  UpdateSchema& MoveInternal(std::string_view name, const Move& move);
+
   // Internal state
   std::shared_ptr<Schema> schema_;
   int32_t last_column_id_;
@@ -398,6 +419,8 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
   std::unordered_map<int32_t, std::vector<int32_t>> parent_to_added_ids_;
   // full name -> field ID for added fields
   std::unordered_map<std::string, int32_t> added_name_to_id_;
+  // parent ID -> move operations
+  std::unordered_map<int32_t, std::vector<Move>> moves_;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/util/error_collector.h 
b/src/iceberg/util/error_collector.h
index 5eb055a9..f07e127a 100644
--- a/src/iceberg/util/error_collector.h
+++ b/src/iceberg/util/error_collector.h
@@ -45,6 +45,17 @@ namespace iceberg {
   ICEBERG_BUILDER_ASSIGN_OR_RETURN_IMPL(             \
       ICEBERG_ASSIGN_OR_RAISE_NAME(result_, __COUNTER__), lhs, rexpr)
 
+#define ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR_IMPL(result_name, lhs, 
rexpr, ...) \
+  auto&& result_name = (rexpr);                                                
        \
+  if (!result_name) [[unlikely]] {                                             
        \
+    return AddError(ErrorKind::kInvalidArgument, __VA_ARGS__);                 
        \
+  }                                                                            
        \
+  lhs = std::move(result_name.value());
+
+#define ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR(lhs, rexpr, ...) \
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR_IMPL(                  \
+      ICEBERG_ASSIGN_OR_RAISE_NAME(result_, __COUNTER__), lhs, rexpr, 
__VA_ARGS__)
+
 #define ICEBERG_BUILDER_CHECK(expr, ...)                         \
   do {                                                           \
     if (!(expr)) [[unlikely]] {                                  \


Reply via email to