wgtmac commented on code in PR #484:
URL: https://github.com/apache/iceberg-cpp/pull/484#discussion_r2660111594


##########
src/iceberg/catalog/rest/types.h:
##########
@@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
   bool operator==(const ListTablesResponse&) const = default;
 };
 
+/// \brief Request to commit changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableRequest {
+  TableIdentifier identifier;
+  std::vector<std::shared_ptr<TableRequirement>> requirements;
+  std::vector<std::shared_ptr<TableUpdate>> updates;
+
+  /// \brief Validates the CommitTableRequest.
+  Status Validate() const { return {}; }
+
+  bool operator==(const CommitTableRequest& other) const;
+};
+
+/// \brief Response from committing changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableResponse {
+  std::string metadata_location;

Review Comment:
   This is also required, isn't it?



##########
src/iceberg/json_internal.h:
##########
@@ -357,4 +359,46 @@ ICEBERG_EXPORT nlohmann::json ToJson(const Namespace& ns);
 /// \return A `Namespace` object or an error if the conversion fails.
 ICEBERG_EXPORT Result<Namespace> NamespaceFromJson(const nlohmann::json& json);
 
+/// \brief Serializes a `TableUpdate` object to JSON.
+///
+/// \param[in] update The `TableUpdate` object to be serialized.

Review Comment:
   ```suggestion
   /// \param update The `TableUpdate` object to be serialized.
   ```
   
   Only add `in` when there are mixed input and output parameters. By default 
all parameters are `in`.



##########
src/iceberg/catalog/rest/types.h:
##########
@@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
   bool operator==(const ListTablesResponse&) const = default;
 };
 
+/// \brief Request to commit changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableRequest {
+  TableIdentifier identifier;
+  std::vector<std::shared_ptr<TableRequirement>> requirements;
+  std::vector<std::shared_ptr<TableUpdate>> updates;
+
+  /// \brief Validates the CommitTableRequest.
+  Status Validate() const { return {}; }
+
+  bool operator==(const CommitTableRequest& other) const;
+};
+
+/// \brief Response from committing changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableResponse {
+  std::string metadata_location;
+  std::shared_ptr<TableMetadata> metadata;  // required
+  // TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> 
storage_credential;

Review Comment:
   ```suggestion
   ```
   
   A copy-and-paste error?



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());
+      json[kLastColumnId] = u.last_column_id();
+      break;
+    }
+    case TableUpdate::Kind::kSetCurrentSchema: {
+      const auto& u = static_cast<const table::SetCurrentSchema&>(update);
+      json[kAction] = kActionSetCurrentSchema;
+      json[kSchemaId] = u.schema_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddPartitionSpec: {
+      const auto& u = static_cast<const table::AddPartitionSpec&>(update);
+      json[kAction] = kActionAddPartitionSpec;
+      json[kSpec] = ToJson(*u.spec());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultPartitionSpec: {
+      const auto& u = static_cast<const 
table::SetDefaultPartitionSpec&>(update);
+      json[kAction] = kActionSetDefaultPartitionSpec;
+      json[kSpecId] = u.spec_id();
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionSpecs: {
+      const auto& u = static_cast<const table::RemovePartitionSpecs&>(update);
+      json[kAction] = kActionRemovePartitionSpecs;
+      json[kSpecIds] = u.spec_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSchemas: {
+      const auto& u = static_cast<const table::RemoveSchemas&>(update);
+      json[kAction] = kActionRemoveSchemas;
+      json[kSchemaIds] = u.schema_ids();
+      break;
+    }
+    case TableUpdate::Kind::kAddSortOrder: {
+      const auto& u = static_cast<const table::AddSortOrder&>(update);
+      json[kAction] = kActionAddSortOrder;
+      json[kSortOrder] = ToJson(*u.sort_order());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultSortOrder: {
+      const auto& u = static_cast<const table::SetDefaultSortOrder&>(update);
+      json[kAction] = kActionSetDefaultSortOrder;
+      json[kSortOrderId] = u.sort_order_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddSnapshot: {
+      const auto& u = static_cast<const table::AddSnapshot&>(update);
+      json[kAction] = kActionAddSnapshot;
+      json[kSnapshot] = ToJson(*u.snapshot());
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshots: {
+      const auto& u = static_cast<const table::RemoveSnapshots&>(update);
+      json[kAction] = kActionRemoveSnapshots;
+      json[kSnapshotIds] = u.snapshot_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshotRef: {
+      const auto& u = static_cast<const table::RemoveSnapshotRef&>(update);
+      json[kAction] = kActionRemoveSnapshotRef;
+      json[kRefName] = u.ref_name();
+      break;
+    }
+    case TableUpdate::Kind::kSetSnapshotRef: {
+      const auto& u = static_cast<const table::SetSnapshotRef&>(update);
+      json[kAction] = kActionSetSnapshotRef;
+      json[kRefName] = u.ref_name();
+      json[kSnapshotId] = u.snapshot_id();
+      json[kType] = ToString(u.type());
+      if (u.min_snapshots_to_keep().has_value()) {
+        json[kMinSnapshotsToKeep] = u.min_snapshots_to_keep().value();
+      }
+      if (u.max_snapshot_age_ms().has_value()) {
+        json[kMaxSnapshotAgeMs] = u.max_snapshot_age_ms().value();
+      }
+      if (u.max_ref_age_ms().has_value()) {
+        json[kMaxRefAgeMs] = u.max_ref_age_ms().value();
+      }
+      break;
+    }
+    case TableUpdate::Kind::kSetProperties: {
+      const auto& u = static_cast<const table::SetProperties&>(update);
+      json[kAction] = kActionSetProperties;
+      json[kUpdates] = u.updated();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveProperties: {
+      const auto& u = static_cast<const table::RemoveProperties&>(update);
+      json[kAction] = kActionRemoveProperties;
+      json[kRemovals] = std::vector<std::string>(u.removed().begin(), 
u.removed().end());
+      break;
+    }
+    case TableUpdate::Kind::kSetLocation: {
+      const auto& u = static_cast<const table::SetLocation&>(update);
+      json[kAction] = kActionSetLocation;
+      json[kLocation] = u.location();
+      break;
+    }
+  }
+  return json;
+}
+
+nlohmann::json ToJson(const TableRequirement& requirement) {
+  nlohmann::json json;
+
+  if (dynamic_cast<const table::AssertDoesNotExist*>(&requirement)) {
+    json[kType] = kRequirementAssertDoesNotExist;
+  } else if (auto* r = dynamic_cast<const table::AssertUUID*>(&requirement)) {
+    json[kType] = kRequirementAssertUUID;
+    json[kUUID] = r->uuid();
+  } else if (auto* r = dynamic_cast<const 
table::AssertRefSnapshotID*>(&requirement)) {
+    json[kType] = kRequirementAssertRefSnapshotID;
+    json[kRefName] = r->ref_name();
+    if (r->snapshot_id().has_value()) {
+      json[kSnapshotId] = r->snapshot_id().value();
+    } else {
+      json[kSnapshotId] = nullptr;
+    }
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertLastAssignedFieldId*>(&requirement)) {
+    json[kType] = kRequirementAssertLastAssignedFieldId;
+    json[kLastAssignedFieldId] = r->last_assigned_field_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertCurrentSchemaID*>(&requirement)) {
+    json[kType] = kRequirementAssertCurrentSchemaID;
+    json[kCurrentSchemaId] = r->schema_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertLastAssignedPartitionId*>(
+                 &requirement)) {
+    json[kType] = kRequirementAssertLastAssignedPartitionId;
+    json[kLastAssignedPartitionId] = r->last_assigned_partition_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertDefaultSpecID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSpecID;
+    json[kDefaultSpecId] = r->spec_id();
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertDefaultSortOrderID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSortOrderID;
+    json[kDefaultSortOrderId] = r->sort_order_id();
+  }
+
+  return json;
+}
+
+Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(
+    const nlohmann::json& json, const std::shared_ptr<Schema>& schema) {
+  ICEBERG_ASSIGN_OR_RAISE(auto action, GetJsonValue<std::string>(json, 
kAction));
+
+  if (action == kActionAssignUUID) {
+    ICEBERG_ASSIGN_OR_RAISE(auto uuid, GetJsonValue<std::string>(json, kUUID));
+    return std::make_unique<table::AssignUUID>(std::move(uuid));
+  }
+  if (action == kActionUpgradeFormatVersion) {
+    ICEBERG_ASSIGN_OR_RAISE(auto format_version,
+                            GetJsonValue<int8_t>(json, kFormatVersion));
+    return std::make_unique<table::UpgradeFormatVersion>(format_version);
+  }
+  if (action == kActionAddSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
+                            GetJsonValue<nlohmann::json>(json, kSchema));
+    ICEBERG_ASSIGN_OR_RAISE(auto parsed_schema, SchemaFromJson(schema_json));
+    ICEBERG_ASSIGN_OR_RAISE(auto last_column_id,
+                            GetJsonValue<int32_t>(json, kLastColumnId));
+    return std::make_unique<table::AddSchema>(std::move(parsed_schema), 
last_column_id);
+  }
+  if (action == kActionSetCurrentSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, 
kSchemaId));
+    return std::make_unique<table::SetCurrentSchema>(schema_id);
+  }
+  if (action == kActionAddPartitionSpec) {
+    if (!schema) {
+      return NotSupported(
+          "Cannot parse AddPartitionSpec without schema context. Use the 
overload "
+          "that takes a schema parameter.");
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_json, GetJsonValue<nlohmann::json>(json, 
kSpec));
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id_opt,
+                            GetJsonValueOptional<int32_t>(spec_json, kSpecId));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto spec,
+        PartitionSpecFromJson(schema, spec_json,
+                              
spec_id_opt.value_or(PartitionSpec::kInitialSpecId)));
+    return std::make_unique<table::AddPartitionSpec>(std::move(spec));
+  }
+  if (action == kActionSetDefaultPartitionSpec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, 
kSpecId));
+    return std::make_unique<table::SetDefaultPartitionSpec>(spec_id);
+  }
+  if (action == kActionRemovePartitionSpecs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_ids,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSpecIds));
+    return std::make_unique<table::RemovePartitionSpecs>(std::move(spec_ids));
+  }
+  if (action == kActionRemoveSchemas) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_ids_vec,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSchemaIds));
+    std::unordered_set<int32_t> schema_ids(schema_ids_vec.begin(), 
schema_ids_vec.end());
+    return std::make_unique<table::RemoveSchemas>(std::move(schema_ids));
+  }
+  if (action == kActionAddSortOrder) {
+    if (!schema) {
+      return NotSupported(
+          "Cannot parse AddSortOrder without schema context. Use the overload "
+          "that takes a schema parameter.");
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order_json,
+                            GetJsonValue<nlohmann::json>(json, kSortOrder));
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order, 
SortOrderFromJson(sort_order_json, schema));
+    return std::make_unique<table::AddSortOrder>(std::move(sort_order));
+  }
+  if (action == kActionSetDefaultSortOrder) {
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order_id,
+                            GetJsonValue<int32_t>(json, kSortOrderId));
+    return std::make_unique<table::SetDefaultSortOrder>(sort_order_id);
+  }
+  if (action == kActionAddSnapshot) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_json,
+                            GetJsonValue<nlohmann::json>(json, kSnapshot));
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot, SnapshotFromJson(snapshot_json));
+    return std::make_unique<table::AddSnapshot>(std::move(snapshot));
+  }
+  if (action == kActionRemoveSnapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_ids,
+                            GetJsonValue<std::vector<int64_t>>(json, 
kSnapshotIds));
+    return std::make_unique<table::RemoveSnapshots>(std::move(snapshot_ids));
+  }
+  if (action == kActionRemoveSnapshotRef) {
+    ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, 
kRefName));
+    return std::make_unique<table::RemoveSnapshotRef>(std::move(ref_name));
+  }
+  if (action == kActionSetSnapshotRef) {
+    ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, 
kRefName));
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, 
kSnapshotId));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto type,
+        GetJsonValue<std::string>(json, 
kType).and_then(SnapshotRefTypeFromString));
+    ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots,
+                            GetJsonValueOptional<int32_t>(json, 
kMinSnapshotsToKeep));
+    ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age,
+                            GetJsonValueOptional<int64_t>(json, 
kMaxSnapshotAgeMs));
+    ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
+                            GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
+    return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), 
snapshot_id, type,
+                                                   min_snapshots, 
max_snapshot_age,
+                                                   max_ref_age);
+  }
+  if (action == kActionSetProperties) {
+    using StringMap = std::unordered_map<std::string, std::string>;
+    ICEBERG_ASSIGN_OR_RAISE(auto updates, GetJsonValue<StringMap>(json, 
kUpdates));
+    return std::make_unique<table::SetProperties>(std::move(updates));
+  }
+  if (action == kActionRemoveProperties) {
+    ICEBERG_ASSIGN_OR_RAISE(auto removals_vec,
+                            GetJsonValue<std::vector<std::string>>(json, 
kRemovals));
+    std::unordered_set<std::string> removals(removals_vec.begin(), 
removals_vec.end());

Review Comment:
   ```suggestion
       std::unordered_set<std::string> 
removals(std::make_move_iterator(removals_vec.begin()), 
std::make_move_iterator(removals_vec.end()));
   ``



##########
src/iceberg/table_requirement.h:
##########
@@ -78,6 +78,11 @@ class ICEBERG_EXPORT AssertDoesNotExist : public 
TableRequirement {
   Kind kind() const override { return Kind::kAssertDoesNotExist; }
 
   Status Validate(const TableMetadata* base) const override;
+
+  /// \brief Compare two AssertDoesNotExist for equality
+  friend bool operator==(const AssertDoesNotExist&, const AssertDoesNotExist&) 
{
+    return true;  // No fields to compare
+  }

Review Comment:
   ```suggestion
     friend bool operator==(const AssertDoesNotExist&, const 
AssertDoesNotExist&) = default;
   ```
   
   Same for below.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -350,9 +403,26 @@ Result<std::shared_ptr<Table>> 
RestCatalog::LoadTable(const TableIdentifier& ide
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const std::string& metadata_file_location) {
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier, const std::string& 
metadata_file_location) {
+  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
+  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
+
+  RegisterTableRequest request{
+      .name = identifier.name,
+      .metadata_location = metadata_file_location,
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
+  ICEBERG_ASSIGN_OR_RAISE(
+      const auto response,
+      client_->Post(path, json_request, /*headers=*/{}, 
*TableErrorHandler::Instance()));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
+  ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
+  auto non_const_catalog = 
std::const_pointer_cast<RestCatalog>(shared_from_this());
+  return Table::Make(identifier, load_result.metadata,
+                     std::move(load_result.metadata_location), file_io_,
+                     non_const_catalog);

Review Comment:
   ```suggestion
     return Table::Make(identifier, std::move(load_result.metadata),
                        std::move(load_result.metadata_location), file_io_,
                        shared_from_this());
   ```



##########
src/iceberg/test/json_internal_test.cc:
##########
@@ -293,4 +295,674 @@ TEST(JsonInternalTest, NameMapping) {
   TestJsonConversion(*mapping, expected_json);
 }
 
+// TableUpdate JSON Serialization/Deserialization Tests
+TEST(TableUpdateJsonTest, AssignUUIDToJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  table::AssignUUID update(uuid);
+  nlohmann::json expected =
+      
R"({"action":"assign-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected) << "AssignUUID should convert to the correct JSON 
value";
+}
+
+TEST(TableUpdateJsonTest, AssignUUIDFromJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  nlohmann::json json =
+      
R"({"action":"assign-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+  table::AssignUUID expected(uuid);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kAssignUUID);
+  auto* actual = dynamic_cast<table::AssignUUID*>(parsed.value().get());
+  EXPECT_EQ(actual->uuid(), expected.uuid()) << "UUID should parse correctly 
from JSON";
+}
+
+TEST(TableUpdateJsonTest, UpgradeFormatVersionToJson) {
+  int32_t format_version = 2;
+  table::UpgradeFormatVersion update(format_version);
+  nlohmann::json expected =
+      R"({"action":"upgrade-format-version","format-version":2})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "UpgradeFormatVersion should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, UpgradeFormatVersionFromJson) {
+  int32_t format_version = 2;
+  nlohmann::json json = 
R"({"action":"upgrade-format-version","format-version":2})"_json;
+  table::UpgradeFormatVersion expected(format_version);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kUpgradeFormatVersion);
+  auto* actual = 
dynamic_cast<table::UpgradeFormatVersion*>(parsed.value().get());
+  EXPECT_EQ(actual->format_version(), expected.format_version())
+      << "Format version should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, AddSchemaToJson) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField(1, "id", int64(), false),
+                               SchemaField(2, "name", string(), true)},
+      /*schema_id=*/1);
+  table::AddSchema update(schema, 2);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "add-schema");
+  EXPECT_EQ(json["last-column-id"], 2);
+  EXPECT_TRUE(json.contains("schema")) << "AddSchema should include schema in 
JSON";
+}
+
+TEST(TableUpdateJsonTest, AddSchemaFromJson) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField(1, "id", int64(), false),
+                               SchemaField(2, "name", string(), true)},
+      /*schema_id=*/1);
+  table::AddSchema expected(schema, 2);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kAddSchema);
+  auto* actual = dynamic_cast<table::AddSchema*>(parsed.value().get());
+  EXPECT_EQ(actual->last_column_id(), expected.last_column_id())
+      << "Last column ID should parse correctly from JSON";
+  EXPECT_EQ(actual->schema()->schema_id(), schema->schema_id())
+      << "Schema ID should parse correctly from JSON";
+  EXPECT_EQ(actual->schema()->fields().size(), 2)
+      << "Schema fields should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetCurrentSchemaToJson) {
+  int32_t schema_id = 1;
+  table::SetCurrentSchema update(schema_id);
+  nlohmann::json expected = 
R"({"action":"set-current-schema","schema-id":1})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "SetCurrentSchema should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, SetCurrentSchemaFromJson) {
+  int32_t schema_id = 1;
+  nlohmann::json json = 
R"({"action":"set-current-schema","schema-id":1})"_json;
+  table::SetCurrentSchema expected(schema_id);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetCurrentSchema);
+  auto* actual = dynamic_cast<table::SetCurrentSchema*>(parsed.value().get());
+  EXPECT_EQ(actual->schema_id(), expected.schema_id())
+      << "Schema ID should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, AddPartitionSpecRequiresSchema) {
+  nlohmann::json json = 
R"({"action":"add-spec","spec":{"spec-id":1,"fields":[]}})"_json;
+
+  auto result = TableUpdateFromJson(json);
+  EXPECT_THAT(result, IsError(ErrorKind::kNotSupported))
+      << "AddPartitionSpec without schema should fail";
+}
+
+TEST(TableUpdateJsonTest, AddPartitionSpecWithSchema) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField(1, "id", int64(), false),
+                               SchemaField(2, "region", string(), true)},
+      /*schema_id=*/1);
+
+  nlohmann::json json = R"({
+    "action": "add-spec",
+    "spec": {
+      "spec-id": 1,
+      "fields": [
+        {"source-id": 2, "field-id": 1000, "transform": "identity", "name": 
"region_partition"}
+      ]
+    }
+  })"_json;
+
+  auto result = TableUpdateFromJson(json, schema);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_EQ(result.value()->kind(), TableUpdate::Kind::kAddPartitionSpec);
+  auto* parsed = dynamic_cast<table::AddPartitionSpec*>(result.value().get());
+  EXPECT_EQ(parsed->spec()->spec_id(), 1) << "Spec ID should parse correctly 
from JSON";
+  EXPECT_EQ(parsed->spec()->fields().size(), 1)
+      << "Spec fields should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetDefaultPartitionSpecToJson) {
+  int32_t spec_id = 2;
+  table::SetDefaultPartitionSpec update(spec_id);
+  nlohmann::json expected = 
R"({"action":"set-default-spec","spec-id":2})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "SetDefaultPartitionSpec should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, SetDefaultPartitionSpecFromJson) {
+  int32_t spec_id = 2;
+  nlohmann::json json = R"({"action":"set-default-spec","spec-id":2})"_json;
+  table::SetDefaultPartitionSpec expected(spec_id);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableUpdate::Kind::kSetDefaultPartitionSpec);
+  auto* actual = 
dynamic_cast<table::SetDefaultPartitionSpec*>(parsed.value().get());
+  EXPECT_EQ(actual->spec_id(), expected.spec_id())
+      << "Spec ID should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, RemovePartitionSpecsToJson) {
+  std::vector<int32_t> spec_ids = {1, 2, 3};
+  table::RemovePartitionSpecs update(spec_ids);
+  nlohmann::json expected =
+      R"({"action":"remove-partition-specs","spec-ids":[1,2,3]})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "RemovePartitionSpecs should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, RemovePartitionSpecsFromJson) {
+  std::vector<int32_t> spec_ids = {1, 2, 3};
+  nlohmann::json json = 
R"({"action":"remove-partition-specs","spec-ids":[1,2,3]})"_json;
+  table::RemovePartitionSpecs expected(spec_ids);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kRemovePartitionSpecs);
+  auto* actual = 
dynamic_cast<table::RemovePartitionSpecs*>(parsed.value().get());
+  EXPECT_EQ(actual->spec_ids(), expected.spec_ids())
+      << "Spec IDs should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, RemoveSchemasToJson) {
+  std::unordered_set<int32_t> schema_ids = {1, 2};
+  table::RemoveSchemas update(schema_ids);
+  nlohmann::json expected = 
R"({"action":"remove-schemas","schema-ids":[1,2]})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "remove-schemas");
+}
+
+TEST(TableUpdateJsonTest, RemoveSchemasFromJson) {
+  std::unordered_set<int32_t> schema_ids = {1, 2};
+  nlohmann::json json = 
R"({"action":"remove-schemas","schema-ids":[1,2]})"_json;
+  table::RemoveSchemas expected(schema_ids);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kRemoveSchemas);
+  auto* actual = dynamic_cast<table::RemoveSchemas*>(parsed.value().get());
+  EXPECT_EQ(actual->schema_ids(), expected.schema_ids())
+      << "Schema IDs should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, AddSortOrderRequiresSchema) {
+  nlohmann::json json =
+      
R"({"action":"add-sort-order","sort-order":{"order-id":1,"fields":[]}})"_json;
+
+  auto result = TableUpdateFromJson(json);
+  EXPECT_THAT(result, IsError(ErrorKind::kNotSupported))
+      << "AddSortOrder without schema should fail";
+}
+
+TEST(TableUpdateJsonTest, SetDefaultSortOrderToJson) {
+  int32_t sort_order_id = 1;
+  table::SetDefaultSortOrder update(sort_order_id);
+  nlohmann::json expected =
+      R"({"action":"set-default-sort-order","sort-order-id":1})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "SetDefaultSortOrder should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, SetDefaultSortOrderFromJson) {
+  int32_t sort_order_id = 1;
+  nlohmann::json json = 
R"({"action":"set-default-sort-order","sort-order-id":1})"_json;
+  table::SetDefaultSortOrder expected(sort_order_id);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetDefaultSortOrder);
+  auto* actual = 
dynamic_cast<table::SetDefaultSortOrder*>(parsed.value().get());
+  EXPECT_EQ(actual->sort_order_id(), expected.sort_order_id())
+      << "Sort order ID should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, AddSnapshotToJson) {
+  auto snapshot = std::make_shared<Snapshot>(
+      Snapshot{.snapshot_id = 123456789,
+               .parent_snapshot_id = 987654321,
+               .sequence_number = 5,
+               .timestamp_ms = TimePointMsFromUnixMs(1234567890000).value(),
+               .manifest_list = "/path/to/manifest-list.avro",
+               .summary = {{SnapshotSummaryFields::kOperation, 
DataOperation::kAppend}},
+               .schema_id = 1});
+  table::AddSnapshot update(snapshot);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "add-snapshot");
+  EXPECT_TRUE(json.contains("snapshot")) << "AddSnapshot should include 
snapshot in JSON";
+}
+
+TEST(TableUpdateJsonTest, AddSnapshotFromJson) {
+  auto snapshot = std::make_shared<Snapshot>(
+      Snapshot{.snapshot_id = 123456789,
+               .parent_snapshot_id = 987654321,
+               .sequence_number = 5,
+               .timestamp_ms = TimePointMsFromUnixMs(1234567890000).value(),
+               .manifest_list = "/path/to/manifest-list.avro",
+               .summary = {{SnapshotSummaryFields::kOperation, 
DataOperation::kAppend}},
+               .schema_id = 1});
+  table::AddSnapshot expected(snapshot);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kAddSnapshot);
+  auto* actual = dynamic_cast<table::AddSnapshot*>(parsed.value().get());
+  EXPECT_EQ(actual->snapshot()->snapshot_id, snapshot->snapshot_id)
+      << "Snapshot ID should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, RemoveSnapshotsToJson) {
+  std::vector<int64_t> snapshot_ids = {111, 222, 333};
+  table::RemoveSnapshots update(snapshot_ids);
+  nlohmann::json expected =
+      R"({"action":"remove-snapshots","snapshot-ids":[111,222,333]})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected) << "RemoveSnapshots should convert to the correct 
JSON value";
+}
+
+TEST(TableUpdateJsonTest, RemoveSnapshotsFromJson) {
+  std::vector<int64_t> snapshot_ids = {111, 222, 333};
+  nlohmann::json json =
+      R"({"action":"remove-snapshots","snapshot-ids":[111,222,333]})"_json;
+  table::RemoveSnapshots expected(snapshot_ids);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kRemoveSnapshots);
+  auto* actual = dynamic_cast<table::RemoveSnapshots*>(parsed.value().get());
+  EXPECT_EQ(actual->snapshot_ids(), expected.snapshot_ids())
+      << "Snapshot IDs should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, RemoveSnapshotRefToJson) {
+  std::string ref_name = "my-branch";
+  table::RemoveSnapshotRef update(ref_name);
+  nlohmann::json expected =
+      R"({"action":"remove-snapshot-ref","ref-name":"my-branch"})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected)
+      << "RemoveSnapshotRef should convert to the correct JSON value";
+}
+
+TEST(TableUpdateJsonTest, RemoveSnapshotRefFromJson) {
+  std::string ref_name = "my-branch";
+  nlohmann::json json = 
R"({"action":"remove-snapshot-ref","ref-name":"my-branch"})"_json;
+  table::RemoveSnapshotRef expected(ref_name);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kRemoveSnapshotRef);
+  auto* actual = dynamic_cast<table::RemoveSnapshotRef*>(parsed.value().get());
+  EXPECT_EQ(actual->ref_name(), expected.ref_name())
+      << "Ref name should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetSnapshotRefBranchToJson) {
+  table::SetSnapshotRef update("main", 123456789, SnapshotRefType::kBranch, 5, 
86400000,
+                               604800000);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "set-snapshot-ref");
+  EXPECT_EQ(json["ref-name"], "main");
+  EXPECT_EQ(json["snapshot-id"], 123456789);
+  EXPECT_EQ(json["type"], "branch");
+}
+
+TEST(TableUpdateJsonTest, SetSnapshotRefBranchFromJson) {
+  table::SetSnapshotRef expected("main", 123456789, SnapshotRefType::kBranch, 
5, 86400000,
+                                 604800000);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetSnapshotRef);
+  auto* actual = dynamic_cast<table::SetSnapshotRef*>(parsed.value().get());
+  EXPECT_EQ(actual->ref_name(), expected.ref_name())
+      << "Ref name should parse correctly from JSON";
+  EXPECT_EQ(actual->snapshot_id(), expected.snapshot_id())
+      << "Snapshot ID should parse correctly from JSON";
+  EXPECT_EQ(actual->type(), expected.type()) << "Type should parse correctly 
from JSON";
+  EXPECT_EQ(actual->min_snapshots_to_keep(), expected.min_snapshots_to_keep())
+      << "Min snapshots to keep should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetSnapshotRefTagToJson) {
+  table::SetSnapshotRef update("release-1.0", 987654321, 
SnapshotRefType::kTag);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "set-snapshot-ref");
+  EXPECT_EQ(json["type"], "tag");
+}
+
+TEST(TableUpdateJsonTest, SetSnapshotRefTagFromJson) {
+  table::SetSnapshotRef expected("release-1.0", 987654321, 
SnapshotRefType::kTag);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetSnapshotRef);
+  auto* actual = dynamic_cast<table::SetSnapshotRef*>(parsed.value().get());
+  EXPECT_EQ(actual->type(), SnapshotRefType::kTag)
+      << "Type should parse correctly as tag from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetPropertiesToJson) {
+  std::unordered_map<std::string, std::string> props = {{"key1", "value1"},
+                                                        {"key2", "value2"}};
+  table::SetProperties update(props);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "set-properties");
+  EXPECT_TRUE(json.contains("updates")) << "SetProperties should include 
updates in JSON";
+}
+
+TEST(TableUpdateJsonTest, SetPropertiesFromJson) {
+  std::unordered_map<std::string, std::string> props = {{"key1", "value1"},
+                                                        {"key2", "value2"}};
+  table::SetProperties expected(props);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetProperties);
+  auto* actual = dynamic_cast<table::SetProperties*>(parsed.value().get());
+  EXPECT_EQ(actual->updated(), expected.updated())
+      << "Properties should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, RemovePropertiesToJson) {
+  std::unordered_set<std::string> props = {"key1", "key2"};
+  table::RemoveProperties update(props);
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json["action"], "remove-properties");
+  EXPECT_TRUE(json.contains("removals"))
+      << "RemoveProperties should include removals in JSON";
+}
+
+TEST(TableUpdateJsonTest, RemovePropertiesFromJson) {
+  std::unordered_set<std::string> props = {"key1", "key2"};
+  table::RemoveProperties expected(props);
+  auto json = ToJson(expected);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kRemoveProperties);
+  auto* actual = dynamic_cast<table::RemoveProperties*>(parsed.value().get());
+  EXPECT_EQ(actual->removed(), expected.removed())
+      << "Removed properties should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, SetLocationToJson) {
+  std::string location = "s3://bucket/warehouse/table";
+  table::SetLocation update(location);
+  nlohmann::json expected =
+      
R"({"action":"set-location","location":"s3://bucket/warehouse/table"})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected) << "SetLocation should convert to the correct JSON 
value";
+}
+
+TEST(TableUpdateJsonTest, SetLocationFromJson) {
+  std::string location = "s3://bucket/warehouse/table";
+  nlohmann::json json =
+      
R"({"action":"set-location","location":"s3://bucket/warehouse/table"})"_json;
+  table::SetLocation expected(location);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kSetLocation);
+  auto* actual = dynamic_cast<table::SetLocation*>(parsed.value().get());
+  EXPECT_EQ(actual->location(), expected.location())
+      << "Location should parse correctly from JSON";
+}
+
+TEST(TableUpdateJsonTest, UnknownAction) {
+  nlohmann::json json = R"({"action":"unknown-action"})"_json;
+  auto result = TableUpdateFromJson(json);
+  EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError));
+  EXPECT_THAT(result, HasErrorMessage("Unknown table update action"));
+}
+
+// TableRequirement JSON Serialization/Deserialization Tests
+TEST(TableRequirementJsonTest, AssertDoesNotExistToJson) {
+  table::AssertDoesNotExist req;
+  nlohmann::json expected = R"({"type":"assert-create"})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertDoesNotExist should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertDoesNotExistFromJson) {
+  nlohmann::json json = R"({"type":"assert-create"})"_json;
+  table::AssertDoesNotExist expected;
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertDoesNotExist);
+}
+
+TEST(TableRequirementJsonTest, AssertUUIDToJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  table::AssertUUID req(uuid);
+  nlohmann::json expected =
+      
R"({"type":"assert-table-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected) << "AssertUUID should convert to the correct JSON 
value";
+}
+
+TEST(TableRequirementJsonTest, AssertUUIDFromJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  nlohmann::json json =
+      
R"({"type":"assert-table-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+  table::AssertUUID expected(uuid);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableRequirement::Kind::kAssertUUID);
+  auto* actual = dynamic_cast<table::AssertUUID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected) << "UUID should parse correctly from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertRefSnapshotIDToJson) {
+  std::string ref_name = "main";
+  int64_t snapshot_id = 123456789;
+  table::AssertRefSnapshotID req(ref_name, snapshot_id);
+  nlohmann::json expected =
+      
R"({"type":"assert-ref-snapshot-id","ref-name":"main","snapshot-id":123456789})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertRefSnapshotID should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertRefSnapshotIDFromJson) {
+  std::string ref_name = "main";
+  int64_t snapshot_id = 123456789;
+  nlohmann::json json =
+      
R"({"type":"assert-ref-snapshot-id","ref-name":"main","snapshot-id":123456789})"_json;
+  table::AssertRefSnapshotID expected(ref_name, snapshot_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertRefSnapshotID);
+  auto* actual = 
dynamic_cast<table::AssertRefSnapshotID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected) << "AssertRefSnapshotID should parse correctly 
from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertRefSnapshotIDToJsonWithNull) {
+  std::string ref_name = "main";
+  table::AssertRefSnapshotID req(ref_name, std::nullopt);
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json["snapshot-id"], nullptr);
+}
+
+TEST(TableRequirementJsonTest, AssertRefSnapshotIDFromJsonWithNull) {
+  std::string ref_name = "main";
+  nlohmann::json json =
+      
R"({"type":"assert-ref-snapshot-id","ref-name":"main","snapshot-id":null})"_json;
+  table::AssertRefSnapshotID expected(ref_name, std::nullopt);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  auto* actual = 
dynamic_cast<table::AssertRefSnapshotID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected) << "AssertRefSnapshotID with null should parse 
correctly";
+}
+
+TEST(TableRequirementJsonTest, AssertLastAssignedFieldIdToJson) {
+  int32_t last_assigned_field_id = 100;
+  table::AssertLastAssignedFieldId req(last_assigned_field_id);
+  nlohmann::json expected =
+      
R"({"type":"assert-last-assigned-field-id","last-assigned-field-id":100})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertLastAssignedFieldId should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertLastAssignedFieldIdFromJson) {
+  int32_t last_assigned_field_id = 100;
+  nlohmann::json json =
+      
R"({"type":"assert-last-assigned-field-id","last-assigned-field-id":100})"_json;
+  table::AssertLastAssignedFieldId expected(last_assigned_field_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertLastAssignedFieldId);
+  auto* actual = 
dynamic_cast<table::AssertLastAssignedFieldId*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected)
+      << "AssertLastAssignedFieldId should parse correctly from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertCurrentSchemaIDToJson) {
+  int32_t schema_id = 1;
+  table::AssertCurrentSchemaID req(schema_id);
+  nlohmann::json expected =
+      R"({"type":"assert-current-schema-id","current-schema-id":1})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertCurrentSchemaID should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertCurrentSchemaIDFromJson) {
+  int32_t schema_id = 1;
+  nlohmann::json json =
+      R"({"type":"assert-current-schema-id","current-schema-id":1})"_json;
+  table::AssertCurrentSchemaID expected(schema_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertCurrentSchemaID);
+  auto* actual = 
dynamic_cast<table::AssertCurrentSchemaID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected)
+      << "AssertCurrentSchemaID should parse correctly from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertLastAssignedPartitionIdToJson) {
+  int32_t last_assigned_partition_id = 1000;
+  table::AssertLastAssignedPartitionId req(last_assigned_partition_id);
+  nlohmann::json expected =
+      
R"({"type":"assert-last-assigned-partition-id","last-assigned-partition-id":1000})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertLastAssignedPartitionId should convert to the correct JSON 
value";
+}
+
+TEST(TableRequirementJsonTest, AssertLastAssignedPartitionIdFromJson) {
+  int32_t last_assigned_partition_id = 1000;
+  nlohmann::json json =
+      
R"({"type":"assert-last-assigned-partition-id","last-assigned-partition-id":1000})"_json;
+  table::AssertLastAssignedPartitionId expected(last_assigned_partition_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(),
+            TableRequirement::Kind::kAssertLastAssignedPartitionId);
+  auto* actual =
+      
dynamic_cast<table::AssertLastAssignedPartitionId*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected)
+      << "AssertLastAssignedPartitionId should parse correctly from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertDefaultSpecIDToJson) {
+  int32_t spec_id = 0;
+  table::AssertDefaultSpecID req(spec_id);
+  nlohmann::json expected =
+      R"({"type":"assert-default-spec-id","default-spec-id":0})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertDefaultSpecID should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertDefaultSpecIDFromJson) {
+  int32_t spec_id = 0;
+  nlohmann::json json = 
R"({"type":"assert-default-spec-id","default-spec-id":0})"_json;
+  table::AssertDefaultSpecID expected(spec_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertDefaultSpecID);
+  auto* actual = 
dynamic_cast<table::AssertDefaultSpecID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected) << "AssertDefaultSpecID should parse correctly 
from JSON";
+}
+
+TEST(TableRequirementJsonTest, AssertDefaultSortOrderIDToJson) {
+  int32_t sort_order_id = 0;
+  table::AssertDefaultSortOrderID req(sort_order_id);
+  nlohmann::json expected =
+      
R"({"type":"assert-default-sort-order-id","default-sort-order-id":0})"_json;
+
+  auto json = ToJson(req);
+  EXPECT_EQ(json, expected)
+      << "AssertDefaultSortOrderID should convert to the correct JSON value";
+}
+
+TEST(TableRequirementJsonTest, AssertDefaultSortOrderIDFromJson) {
+  int32_t sort_order_id = 0;
+  nlohmann::json json =
+      
R"({"type":"assert-default-sort-order-id","default-sort-order-id":0})"_json;
+  table::AssertDefaultSortOrderID expected(sort_order_id);
+
+  auto parsed = TableRequirementFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), 
TableRequirement::Kind::kAssertDefaultSortOrderID);
+  auto* actual = 
dynamic_cast<table::AssertDefaultSortOrderID*>(parsed.value().get());
+  EXPECT_EQ(*actual, expected)
+      << "AssertDefaultSortOrderID should parse correctly from JSON";
+}
+
+TEST(TableRequirementJsonTest, UnknownType) {

Review Comment:
   Can you port `TestMetadataUpdateParser.java` and 
`TestUpdateTableRequestParser.java` as much as possible?



##########
src/iceberg/catalog/rest/types.h:
##########
@@ -29,6 +29,8 @@
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
 #include "iceberg/table_identifier.h"
+#include "iceberg/table_requirement.h"
+#include "iceberg/table_update.h"

Review Comment:
   ```suggestion
   ```
   
   Use forward declaration



##########
src/iceberg/catalog/rest/types.h:
##########
@@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
   bool operator==(const ListTablesResponse&) const = default;
 };
 
+/// \brief Request to commit changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableRequest {
+  TableIdentifier identifier;
+  std::vector<std::shared_ptr<TableRequirement>> requirements;
+  std::vector<std::shared_ptr<TableUpdate>> updates;
+
+  /// \brief Validates the CommitTableRequest.
+  Status Validate() const { return {}; }
+
+  bool operator==(const CommitTableRequest& other) const;
+};
+
+/// \brief Response from committing changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableResponse {
+  std::string metadata_location;
+  std::shared_ptr<TableMetadata> metadata;  // required
+  // TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> 
storage_credential;
+
+  /// \brief Validates the CommitTableResponse.
+  Status Validate() const {
+    if (!metadata) {
+      return Invalid("Invalid metadata: null");

Review Comment:
   We need to use `ValidationFailed` in all `Validate()`. BTW, here we should 
also check `metadata_location` is not empty.



##########
src/iceberg/catalog/rest/types.cc:
##########
@@ -69,4 +69,32 @@ bool LoadTableResult::operator==(const LoadTableResult& 
other) const {
   return true;
 }
 
+bool CommitTableRequest::operator==(const CommitTableRequest& other) const {
+  if (identifier != other.identifier) {
+    return false;
+  }
+  if (requirements.size() != other.requirements.size()) {
+    return false;
+  }
+  if (updates.size() != other.updates.size()) {
+    return false;
+  }
+  // Note: Deep comparison of requirements and updates is not implemented

Review Comment:
   Even for testing, this implementation is wrong. I'd recommend adding a 
`virtual bool Equals(const TableRequirement& other) const` to 
`TableRequirement` and letting each subclass implement this function. Then add 
a `bool operator==(const TableRequirement& other)` in the base 
`TableRequirement` class to call `Equals`.



##########
src/iceberg/json_internal.h:
##########
@@ -26,6 +26,8 @@
 #include "iceberg/result.h"
 #include "iceberg/statistics_file.h"
 #include "iceberg/table_metadata.h"
+#include "iceberg/table_requirement.h"
+#include "iceberg/table_update.h"

Review Comment:
   ```suggestion
   ```
   
   Please use forward declaration as always.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);

Review Comment:
   ```suggestion
         const auto& u = internal::checked_cast<const 
table::AssignUUID&>(update);
   ```
   Same for all below.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());
+      json[kLastColumnId] = u.last_column_id();
+      break;
+    }
+    case TableUpdate::Kind::kSetCurrentSchema: {
+      const auto& u = static_cast<const table::SetCurrentSchema&>(update);
+      json[kAction] = kActionSetCurrentSchema;
+      json[kSchemaId] = u.schema_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddPartitionSpec: {
+      const auto& u = static_cast<const table::AddPartitionSpec&>(update);
+      json[kAction] = kActionAddPartitionSpec;
+      json[kSpec] = ToJson(*u.spec());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultPartitionSpec: {
+      const auto& u = static_cast<const 
table::SetDefaultPartitionSpec&>(update);
+      json[kAction] = kActionSetDefaultPartitionSpec;
+      json[kSpecId] = u.spec_id();
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionSpecs: {
+      const auto& u = static_cast<const table::RemovePartitionSpecs&>(update);
+      json[kAction] = kActionRemovePartitionSpecs;
+      json[kSpecIds] = u.spec_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSchemas: {
+      const auto& u = static_cast<const table::RemoveSchemas&>(update);
+      json[kAction] = kActionRemoveSchemas;
+      json[kSchemaIds] = u.schema_ids();
+      break;
+    }
+    case TableUpdate::Kind::kAddSortOrder: {
+      const auto& u = static_cast<const table::AddSortOrder&>(update);
+      json[kAction] = kActionAddSortOrder;
+      json[kSortOrder] = ToJson(*u.sort_order());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultSortOrder: {
+      const auto& u = static_cast<const table::SetDefaultSortOrder&>(update);
+      json[kAction] = kActionSetDefaultSortOrder;
+      json[kSortOrderId] = u.sort_order_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddSnapshot: {
+      const auto& u = static_cast<const table::AddSnapshot&>(update);
+      json[kAction] = kActionAddSnapshot;
+      json[kSnapshot] = ToJson(*u.snapshot());
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshots: {
+      const auto& u = static_cast<const table::RemoveSnapshots&>(update);
+      json[kAction] = kActionRemoveSnapshots;
+      json[kSnapshotIds] = u.snapshot_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshotRef: {
+      const auto& u = static_cast<const table::RemoveSnapshotRef&>(update);
+      json[kAction] = kActionRemoveSnapshotRef;
+      json[kRefName] = u.ref_name();
+      break;
+    }
+    case TableUpdate::Kind::kSetSnapshotRef: {
+      const auto& u = static_cast<const table::SetSnapshotRef&>(update);
+      json[kAction] = kActionSetSnapshotRef;
+      json[kRefName] = u.ref_name();
+      json[kSnapshotId] = u.snapshot_id();
+      json[kType] = ToString(u.type());
+      if (u.min_snapshots_to_keep().has_value()) {
+        json[kMinSnapshotsToKeep] = u.min_snapshots_to_keep().value();
+      }
+      if (u.max_snapshot_age_ms().has_value()) {
+        json[kMaxSnapshotAgeMs] = u.max_snapshot_age_ms().value();
+      }
+      if (u.max_ref_age_ms().has_value()) {
+        json[kMaxRefAgeMs] = u.max_ref_age_ms().value();
+      }
+      break;
+    }
+    case TableUpdate::Kind::kSetProperties: {
+      const auto& u = static_cast<const table::SetProperties&>(update);
+      json[kAction] = kActionSetProperties;
+      json[kUpdates] = u.updated();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveProperties: {
+      const auto& u = static_cast<const table::RemoveProperties&>(update);
+      json[kAction] = kActionRemoveProperties;
+      json[kRemovals] = std::vector<std::string>(u.removed().begin(), 
u.removed().end());
+      break;
+    }
+    case TableUpdate::Kind::kSetLocation: {
+      const auto& u = static_cast<const table::SetLocation&>(update);
+      json[kAction] = kActionSetLocation;
+      json[kLocation] = u.location();
+      break;
+    }
+  }
+  return json;
+}
+
+nlohmann::json ToJson(const TableRequirement& requirement) {
+  nlohmann::json json;
+
+  if (dynamic_cast<const table::AssertDoesNotExist*>(&requirement)) {
+    json[kType] = kRequirementAssertDoesNotExist;
+  } else if (auto* r = dynamic_cast<const table::AssertUUID*>(&requirement)) {
+    json[kType] = kRequirementAssertUUID;
+    json[kUUID] = r->uuid();
+  } else if (auto* r = dynamic_cast<const 
table::AssertRefSnapshotID*>(&requirement)) {
+    json[kType] = kRequirementAssertRefSnapshotID;
+    json[kRefName] = r->ref_name();
+    if (r->snapshot_id().has_value()) {
+      json[kSnapshotId] = r->snapshot_id().value();
+    } else {
+      json[kSnapshotId] = nullptr;

Review Comment:
   ```suggestion
         json[kSnapshotId] = nlohmann::json::value_t::null;
   ```



##########
src/iceberg/catalog/rest/types.h:
##########
@@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
   bool operator==(const ListTablesResponse&) const = default;
 };
 
+/// \brief Request to commit changes to a table.
+struct ICEBERG_REST_EXPORT CommitTableRequest {
+  TableIdentifier identifier;
+  std::vector<std::shared_ptr<TableRequirement>> requirements;
+  std::vector<std::shared_ptr<TableUpdate>> updates;

Review Comment:
   ```suggestion
     std::vector<std::shared_ptr<TableRequirement>> requirements;  // required
     std::vector<std::shared_ptr<TableUpdate>> updates;            // required
   ```



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -70,6 +72,10 @@ constexpr std::string_view kCode = "code";
 constexpr std::string_view kStack = "stack";
 constexpr std::string_view kError = "error";
 
+// CommitTableRequest field constants

Review Comment:
   Remove this comment and merge two constants below to the above block.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());
+      json[kLastColumnId] = u.last_column_id();
+      break;
+    }
+    case TableUpdate::Kind::kSetCurrentSchema: {
+      const auto& u = static_cast<const table::SetCurrentSchema&>(update);
+      json[kAction] = kActionSetCurrentSchema;
+      json[kSchemaId] = u.schema_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddPartitionSpec: {
+      const auto& u = static_cast<const table::AddPartitionSpec&>(update);
+      json[kAction] = kActionAddPartitionSpec;
+      json[kSpec] = ToJson(*u.spec());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultPartitionSpec: {
+      const auto& u = static_cast<const 
table::SetDefaultPartitionSpec&>(update);
+      json[kAction] = kActionSetDefaultPartitionSpec;
+      json[kSpecId] = u.spec_id();
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionSpecs: {
+      const auto& u = static_cast<const table::RemovePartitionSpecs&>(update);
+      json[kAction] = kActionRemovePartitionSpecs;
+      json[kSpecIds] = u.spec_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSchemas: {
+      const auto& u = static_cast<const table::RemoveSchemas&>(update);
+      json[kAction] = kActionRemoveSchemas;
+      json[kSchemaIds] = u.schema_ids();
+      break;
+    }
+    case TableUpdate::Kind::kAddSortOrder: {
+      const auto& u = static_cast<const table::AddSortOrder&>(update);
+      json[kAction] = kActionAddSortOrder;
+      json[kSortOrder] = ToJson(*u.sort_order());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultSortOrder: {
+      const auto& u = static_cast<const table::SetDefaultSortOrder&>(update);
+      json[kAction] = kActionSetDefaultSortOrder;
+      json[kSortOrderId] = u.sort_order_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddSnapshot: {
+      const auto& u = static_cast<const table::AddSnapshot&>(update);
+      json[kAction] = kActionAddSnapshot;
+      json[kSnapshot] = ToJson(*u.snapshot());
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshots: {
+      const auto& u = static_cast<const table::RemoveSnapshots&>(update);
+      json[kAction] = kActionRemoveSnapshots;
+      json[kSnapshotIds] = u.snapshot_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshotRef: {
+      const auto& u = static_cast<const table::RemoveSnapshotRef&>(update);
+      json[kAction] = kActionRemoveSnapshotRef;
+      json[kRefName] = u.ref_name();
+      break;
+    }
+    case TableUpdate::Kind::kSetSnapshotRef: {
+      const auto& u = static_cast<const table::SetSnapshotRef&>(update);
+      json[kAction] = kActionSetSnapshotRef;
+      json[kRefName] = u.ref_name();
+      json[kSnapshotId] = u.snapshot_id();
+      json[kType] = ToString(u.type());
+      if (u.min_snapshots_to_keep().has_value()) {
+        json[kMinSnapshotsToKeep] = u.min_snapshots_to_keep().value();
+      }
+      if (u.max_snapshot_age_ms().has_value()) {
+        json[kMaxSnapshotAgeMs] = u.max_snapshot_age_ms().value();
+      }
+      if (u.max_ref_age_ms().has_value()) {
+        json[kMaxRefAgeMs] = u.max_ref_age_ms().value();
+      }
+      break;
+    }
+    case TableUpdate::Kind::kSetProperties: {
+      const auto& u = static_cast<const table::SetProperties&>(update);
+      json[kAction] = kActionSetProperties;
+      json[kUpdates] = u.updated();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveProperties: {
+      const auto& u = static_cast<const table::RemoveProperties&>(update);
+      json[kAction] = kActionRemoveProperties;
+      json[kRemovals] = std::vector<std::string>(u.removed().begin(), 
u.removed().end());
+      break;
+    }
+    case TableUpdate::Kind::kSetLocation: {
+      const auto& u = static_cast<const table::SetLocation&>(update);
+      json[kAction] = kActionSetLocation;
+      json[kLocation] = u.location();
+      break;
+    }
+  }
+  return json;
+}
+
+nlohmann::json ToJson(const TableRequirement& requirement) {
+  nlohmann::json json;
+
+  if (dynamic_cast<const table::AssertDoesNotExist*>(&requirement)) {

Review Comment:
   Why not using `requirement.kind()` as below? `dynamic_cast` is slow.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());
+      json[kLastColumnId] = u.last_column_id();
+      break;
+    }
+    case TableUpdate::Kind::kSetCurrentSchema: {
+      const auto& u = static_cast<const table::SetCurrentSchema&>(update);
+      json[kAction] = kActionSetCurrentSchema;
+      json[kSchemaId] = u.schema_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddPartitionSpec: {
+      const auto& u = static_cast<const table::AddPartitionSpec&>(update);
+      json[kAction] = kActionAddPartitionSpec;
+      json[kSpec] = ToJson(*u.spec());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultPartitionSpec: {
+      const auto& u = static_cast<const 
table::SetDefaultPartitionSpec&>(update);
+      json[kAction] = kActionSetDefaultPartitionSpec;
+      json[kSpecId] = u.spec_id();
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionSpecs: {
+      const auto& u = static_cast<const table::RemovePartitionSpecs&>(update);
+      json[kAction] = kActionRemovePartitionSpecs;
+      json[kSpecIds] = u.spec_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSchemas: {
+      const auto& u = static_cast<const table::RemoveSchemas&>(update);
+      json[kAction] = kActionRemoveSchemas;
+      json[kSchemaIds] = u.schema_ids();
+      break;
+    }
+    case TableUpdate::Kind::kAddSortOrder: {
+      const auto& u = static_cast<const table::AddSortOrder&>(update);
+      json[kAction] = kActionAddSortOrder;
+      json[kSortOrder] = ToJson(*u.sort_order());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultSortOrder: {
+      const auto& u = static_cast<const table::SetDefaultSortOrder&>(update);
+      json[kAction] = kActionSetDefaultSortOrder;
+      json[kSortOrderId] = u.sort_order_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddSnapshot: {
+      const auto& u = static_cast<const table::AddSnapshot&>(update);
+      json[kAction] = kActionAddSnapshot;
+      json[kSnapshot] = ToJson(*u.snapshot());
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshots: {
+      const auto& u = static_cast<const table::RemoveSnapshots&>(update);
+      json[kAction] = kActionRemoveSnapshots;
+      json[kSnapshotIds] = u.snapshot_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshotRef: {
+      const auto& u = static_cast<const table::RemoveSnapshotRef&>(update);
+      json[kAction] = kActionRemoveSnapshotRef;
+      json[kRefName] = u.ref_name();
+      break;
+    }
+    case TableUpdate::Kind::kSetSnapshotRef: {
+      const auto& u = static_cast<const table::SetSnapshotRef&>(update);
+      json[kAction] = kActionSetSnapshotRef;
+      json[kRefName] = u.ref_name();
+      json[kSnapshotId] = u.snapshot_id();
+      json[kType] = ToString(u.type());
+      if (u.min_snapshots_to_keep().has_value()) {
+        json[kMinSnapshotsToKeep] = u.min_snapshots_to_keep().value();
+      }
+      if (u.max_snapshot_age_ms().has_value()) {
+        json[kMaxSnapshotAgeMs] = u.max_snapshot_age_ms().value();
+      }
+      if (u.max_ref_age_ms().has_value()) {
+        json[kMaxRefAgeMs] = u.max_ref_age_ms().value();
+      }
+      break;
+    }
+    case TableUpdate::Kind::kSetProperties: {
+      const auto& u = static_cast<const table::SetProperties&>(update);
+      json[kAction] = kActionSetProperties;
+      json[kUpdates] = u.updated();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveProperties: {
+      const auto& u = static_cast<const table::RemoveProperties&>(update);
+      json[kAction] = kActionRemoveProperties;
+      json[kRemovals] = std::vector<std::string>(u.removed().begin(), 
u.removed().end());
+      break;
+    }
+    case TableUpdate::Kind::kSetLocation: {
+      const auto& u = static_cast<const table::SetLocation&>(update);
+      json[kAction] = kActionSetLocation;
+      json[kLocation] = u.location();
+      break;
+    }
+  }
+  return json;
+}
+
+nlohmann::json ToJson(const TableRequirement& requirement) {
+  nlohmann::json json;
+
+  if (dynamic_cast<const table::AssertDoesNotExist*>(&requirement)) {
+    json[kType] = kRequirementAssertDoesNotExist;
+  } else if (auto* r = dynamic_cast<const table::AssertUUID*>(&requirement)) {
+    json[kType] = kRequirementAssertUUID;
+    json[kUUID] = r->uuid();
+  } else if (auto* r = dynamic_cast<const 
table::AssertRefSnapshotID*>(&requirement)) {
+    json[kType] = kRequirementAssertRefSnapshotID;
+    json[kRefName] = r->ref_name();
+    if (r->snapshot_id().has_value()) {
+      json[kSnapshotId] = r->snapshot_id().value();
+    } else {
+      json[kSnapshotId] = nullptr;
+    }
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertLastAssignedFieldId*>(&requirement)) {
+    json[kType] = kRequirementAssertLastAssignedFieldId;
+    json[kLastAssignedFieldId] = r->last_assigned_field_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertCurrentSchemaID*>(&requirement)) {
+    json[kType] = kRequirementAssertCurrentSchemaID;
+    json[kCurrentSchemaId] = r->schema_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertLastAssignedPartitionId*>(
+                 &requirement)) {
+    json[kType] = kRequirementAssertLastAssignedPartitionId;
+    json[kLastAssignedPartitionId] = r->last_assigned_partition_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertDefaultSpecID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSpecID;
+    json[kDefaultSpecId] = r->spec_id();
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertDefaultSortOrderID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSortOrderID;
+    json[kDefaultSortOrderId] = r->sort_order_id();
+  }
+
+  return json;
+}
+
+Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(
+    const nlohmann::json& json, const std::shared_ptr<Schema>& schema) {
+  ICEBERG_ASSIGN_OR_RAISE(auto action, GetJsonValue<std::string>(json, 
kAction));
+
+  if (action == kActionAssignUUID) {
+    ICEBERG_ASSIGN_OR_RAISE(auto uuid, GetJsonValue<std::string>(json, kUUID));
+    return std::make_unique<table::AssignUUID>(std::move(uuid));
+  }
+  if (action == kActionUpgradeFormatVersion) {
+    ICEBERG_ASSIGN_OR_RAISE(auto format_version,
+                            GetJsonValue<int8_t>(json, kFormatVersion));
+    return std::make_unique<table::UpgradeFormatVersion>(format_version);
+  }
+  if (action == kActionAddSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
+                            GetJsonValue<nlohmann::json>(json, kSchema));
+    ICEBERG_ASSIGN_OR_RAISE(auto parsed_schema, SchemaFromJson(schema_json));
+    ICEBERG_ASSIGN_OR_RAISE(auto last_column_id,
+                            GetJsonValue<int32_t>(json, kLastColumnId));
+    return std::make_unique<table::AddSchema>(std::move(parsed_schema), 
last_column_id);
+  }
+  if (action == kActionSetCurrentSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, 
kSchemaId));
+    return std::make_unique<table::SetCurrentSchema>(schema_id);
+  }
+  if (action == kActionAddPartitionSpec) {
+    if (!schema) {
+      return NotSupported(
+          "Cannot parse AddPartitionSpec without schema context. Use the 
overload "
+          "that takes a schema parameter.");
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_json, GetJsonValue<nlohmann::json>(json, 
kSpec));
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id_opt,
+                            GetJsonValueOptional<int32_t>(spec_json, kSpecId));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto spec,
+        PartitionSpecFromJson(schema, spec_json,
+                              
spec_id_opt.value_or(PartitionSpec::kInitialSpecId)));
+    return std::make_unique<table::AddPartitionSpec>(std::move(spec));
+  }
+  if (action == kActionSetDefaultPartitionSpec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, 
kSpecId));
+    return std::make_unique<table::SetDefaultPartitionSpec>(spec_id);
+  }
+  if (action == kActionRemovePartitionSpecs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_ids,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSpecIds));
+    return std::make_unique<table::RemovePartitionSpecs>(std::move(spec_ids));
+  }
+  if (action == kActionRemoveSchemas) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_ids_vec,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSchemaIds));
+    std::unordered_set<int32_t> schema_ids(schema_ids_vec.begin(), 
schema_ids_vec.end());
+    return std::make_unique<table::RemoveSchemas>(std::move(schema_ids));
+  }
+  if (action == kActionAddSortOrder) {
+    if (!schema) {
+      return NotSupported(
+          "Cannot parse AddSortOrder without schema context. Use the overload "
+          "that takes a schema parameter.");
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order_json,
+                            GetJsonValue<nlohmann::json>(json, kSortOrder));
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order, 
SortOrderFromJson(sort_order_json, schema));
+    return std::make_unique<table::AddSortOrder>(std::move(sort_order));
+  }
+  if (action == kActionSetDefaultSortOrder) {
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order_id,
+                            GetJsonValue<int32_t>(json, kSortOrderId));
+    return std::make_unique<table::SetDefaultSortOrder>(sort_order_id);
+  }
+  if (action == kActionAddSnapshot) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_json,
+                            GetJsonValue<nlohmann::json>(json, kSnapshot));
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot, SnapshotFromJson(snapshot_json));
+    return std::make_unique<table::AddSnapshot>(std::move(snapshot));
+  }
+  if (action == kActionRemoveSnapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_ids,
+                            GetJsonValue<std::vector<int64_t>>(json, 
kSnapshotIds));
+    return std::make_unique<table::RemoveSnapshots>(std::move(snapshot_ids));
+  }
+  if (action == kActionRemoveSnapshotRef) {
+    ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, 
kRefName));
+    return std::make_unique<table::RemoveSnapshotRef>(std::move(ref_name));
+  }
+  if (action == kActionSetSnapshotRef) {
+    ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, 
kRefName));
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, 
kSnapshotId));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto type,
+        GetJsonValue<std::string>(json, 
kType).and_then(SnapshotRefTypeFromString));
+    ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots,
+                            GetJsonValueOptional<int32_t>(json, 
kMinSnapshotsToKeep));
+    ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age,
+                            GetJsonValueOptional<int64_t>(json, 
kMaxSnapshotAgeMs));
+    ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
+                            GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
+    return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), 
snapshot_id, type,
+                                                   min_snapshots, 
max_snapshot_age,
+                                                   max_ref_age);
+  }
+  if (action == kActionSetProperties) {
+    using StringMap = std::unordered_map<std::string, std::string>;
+    ICEBERG_ASSIGN_OR_RAISE(auto updates, GetJsonValue<StringMap>(json, 
kUpdates));
+    return std::make_unique<table::SetProperties>(std::move(updates));
+  }
+  if (action == kActionRemoveProperties) {
+    ICEBERG_ASSIGN_OR_RAISE(auto removals_vec,
+                            GetJsonValue<std::vector<std::string>>(json, 
kRemovals));
+    std::unordered_set<std::string> removals(removals_vec.begin(), 
removals_vec.end());
+    return std::make_unique<table::RemoveProperties>(std::move(removals));
+  }
+  if (action == kActionSetLocation) {
+    ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue<std::string>(json, 
kLocation));
+    return std::make_unique<table::SetLocation>(std::move(location));
+  }
+
+  return JsonParseError("Unknown table update action: {}", action);
+}
+
+Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& 
json) {
+  return TableUpdateFromJson(json, nullptr);

Review Comment:
   ```suggestion
     return TableUpdateFromJson(json, /*schema=*/nullptr);
   ```
   
   Usually in this case, we need to remove this function by always passing 
schema to it to make users less confused.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());
+      json[kLastColumnId] = u.last_column_id();
+      break;
+    }
+    case TableUpdate::Kind::kSetCurrentSchema: {
+      const auto& u = static_cast<const table::SetCurrentSchema&>(update);
+      json[kAction] = kActionSetCurrentSchema;
+      json[kSchemaId] = u.schema_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddPartitionSpec: {
+      const auto& u = static_cast<const table::AddPartitionSpec&>(update);
+      json[kAction] = kActionAddPartitionSpec;
+      json[kSpec] = ToJson(*u.spec());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultPartitionSpec: {
+      const auto& u = static_cast<const 
table::SetDefaultPartitionSpec&>(update);
+      json[kAction] = kActionSetDefaultPartitionSpec;
+      json[kSpecId] = u.spec_id();
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionSpecs: {
+      const auto& u = static_cast<const table::RemovePartitionSpecs&>(update);
+      json[kAction] = kActionRemovePartitionSpecs;
+      json[kSpecIds] = u.spec_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSchemas: {
+      const auto& u = static_cast<const table::RemoveSchemas&>(update);
+      json[kAction] = kActionRemoveSchemas;
+      json[kSchemaIds] = u.schema_ids();
+      break;
+    }
+    case TableUpdate::Kind::kAddSortOrder: {
+      const auto& u = static_cast<const table::AddSortOrder&>(update);
+      json[kAction] = kActionAddSortOrder;
+      json[kSortOrder] = ToJson(*u.sort_order());
+      break;
+    }
+    case TableUpdate::Kind::kSetDefaultSortOrder: {
+      const auto& u = static_cast<const table::SetDefaultSortOrder&>(update);
+      json[kAction] = kActionSetDefaultSortOrder;
+      json[kSortOrderId] = u.sort_order_id();
+      break;
+    }
+    case TableUpdate::Kind::kAddSnapshot: {
+      const auto& u = static_cast<const table::AddSnapshot&>(update);
+      json[kAction] = kActionAddSnapshot;
+      json[kSnapshot] = ToJson(*u.snapshot());
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshots: {
+      const auto& u = static_cast<const table::RemoveSnapshots&>(update);
+      json[kAction] = kActionRemoveSnapshots;
+      json[kSnapshotIds] = u.snapshot_ids();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveSnapshotRef: {
+      const auto& u = static_cast<const table::RemoveSnapshotRef&>(update);
+      json[kAction] = kActionRemoveSnapshotRef;
+      json[kRefName] = u.ref_name();
+      break;
+    }
+    case TableUpdate::Kind::kSetSnapshotRef: {
+      const auto& u = static_cast<const table::SetSnapshotRef&>(update);
+      json[kAction] = kActionSetSnapshotRef;
+      json[kRefName] = u.ref_name();
+      json[kSnapshotId] = u.snapshot_id();
+      json[kType] = ToString(u.type());
+      if (u.min_snapshots_to_keep().has_value()) {
+        json[kMinSnapshotsToKeep] = u.min_snapshots_to_keep().value();
+      }
+      if (u.max_snapshot_age_ms().has_value()) {
+        json[kMaxSnapshotAgeMs] = u.max_snapshot_age_ms().value();
+      }
+      if (u.max_ref_age_ms().has_value()) {
+        json[kMaxRefAgeMs] = u.max_ref_age_ms().value();
+      }
+      break;
+    }
+    case TableUpdate::Kind::kSetProperties: {
+      const auto& u = static_cast<const table::SetProperties&>(update);
+      json[kAction] = kActionSetProperties;
+      json[kUpdates] = u.updated();
+      break;
+    }
+    case TableUpdate::Kind::kRemoveProperties: {
+      const auto& u = static_cast<const table::RemoveProperties&>(update);
+      json[kAction] = kActionRemoveProperties;
+      json[kRemovals] = std::vector<std::string>(u.removed().begin(), 
u.removed().end());
+      break;
+    }
+    case TableUpdate::Kind::kSetLocation: {
+      const auto& u = static_cast<const table::SetLocation&>(update);
+      json[kAction] = kActionSetLocation;
+      json[kLocation] = u.location();
+      break;
+    }
+  }
+  return json;
+}
+
+nlohmann::json ToJson(const TableRequirement& requirement) {
+  nlohmann::json json;
+
+  if (dynamic_cast<const table::AssertDoesNotExist*>(&requirement)) {
+    json[kType] = kRequirementAssertDoesNotExist;
+  } else if (auto* r = dynamic_cast<const table::AssertUUID*>(&requirement)) {
+    json[kType] = kRequirementAssertUUID;
+    json[kUUID] = r->uuid();
+  } else if (auto* r = dynamic_cast<const 
table::AssertRefSnapshotID*>(&requirement)) {
+    json[kType] = kRequirementAssertRefSnapshotID;
+    json[kRefName] = r->ref_name();
+    if (r->snapshot_id().has_value()) {
+      json[kSnapshotId] = r->snapshot_id().value();
+    } else {
+      json[kSnapshotId] = nullptr;
+    }
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertLastAssignedFieldId*>(&requirement)) {
+    json[kType] = kRequirementAssertLastAssignedFieldId;
+    json[kLastAssignedFieldId] = r->last_assigned_field_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertCurrentSchemaID*>(&requirement)) {
+    json[kType] = kRequirementAssertCurrentSchemaID;
+    json[kCurrentSchemaId] = r->schema_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertLastAssignedPartitionId*>(
+                 &requirement)) {
+    json[kType] = kRequirementAssertLastAssignedPartitionId;
+    json[kLastAssignedPartitionId] = r->last_assigned_partition_id();
+  } else if (auto* r = dynamic_cast<const 
table::AssertDefaultSpecID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSpecID;
+    json[kDefaultSpecId] = r->spec_id();
+  } else if (auto* r =
+                 dynamic_cast<const 
table::AssertDefaultSortOrderID*>(&requirement)) {
+    json[kType] = kRequirementAssertDefaultSortOrderID;
+    json[kDefaultSortOrderId] = r->sort_order_id();
+  }
+
+  return json;
+}
+
+Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(
+    const nlohmann::json& json, const std::shared_ptr<Schema>& schema) {
+  ICEBERG_ASSIGN_OR_RAISE(auto action, GetJsonValue<std::string>(json, 
kAction));
+
+  if (action == kActionAssignUUID) {
+    ICEBERG_ASSIGN_OR_RAISE(auto uuid, GetJsonValue<std::string>(json, kUUID));
+    return std::make_unique<table::AssignUUID>(std::move(uuid));
+  }
+  if (action == kActionUpgradeFormatVersion) {
+    ICEBERG_ASSIGN_OR_RAISE(auto format_version,
+                            GetJsonValue<int8_t>(json, kFormatVersion));
+    return std::make_unique<table::UpgradeFormatVersion>(format_version);
+  }
+  if (action == kActionAddSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
+                            GetJsonValue<nlohmann::json>(json, kSchema));
+    ICEBERG_ASSIGN_OR_RAISE(auto parsed_schema, SchemaFromJson(schema_json));
+    ICEBERG_ASSIGN_OR_RAISE(auto last_column_id,
+                            GetJsonValue<int32_t>(json, kLastColumnId));
+    return std::make_unique<table::AddSchema>(std::move(parsed_schema), 
last_column_id);
+  }
+  if (action == kActionSetCurrentSchema) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, 
kSchemaId));
+    return std::make_unique<table::SetCurrentSchema>(schema_id);
+  }
+  if (action == kActionAddPartitionSpec) {
+    if (!schema) {
+      return NotSupported(
+          "Cannot parse AddPartitionSpec without schema context. Use the 
overload "
+          "that takes a schema parameter.");
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_json, GetJsonValue<nlohmann::json>(json, 
kSpec));
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id_opt,
+                            GetJsonValueOptional<int32_t>(spec_json, kSpecId));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto spec,
+        PartitionSpecFromJson(schema, spec_json,
+                              
spec_id_opt.value_or(PartitionSpec::kInitialSpecId)));
+    return std::make_unique<table::AddPartitionSpec>(std::move(spec));
+  }
+  if (action == kActionSetDefaultPartitionSpec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, 
kSpecId));
+    return std::make_unique<table::SetDefaultPartitionSpec>(spec_id);
+  }
+  if (action == kActionRemovePartitionSpecs) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_ids,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSpecIds));
+    return std::make_unique<table::RemovePartitionSpecs>(std::move(spec_ids));
+  }
+  if (action == kActionRemoveSchemas) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_ids_vec,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kSchemaIds));
+    std::unordered_set<int32_t> schema_ids(schema_ids_vec.begin(), 
schema_ids_vec.end());
+    return std::make_unique<table::RemoveSchemas>(std::move(schema_ids));
+  }
+  if (action == kActionAddSortOrder) {
+    if (!schema) {

Review Comment:
   Use `ICEBERG_PRECHECK` because `InvalidArgument` is more appropriate. Same 
for below.



##########
src/iceberg/json_internal.cc:
##########
@@ -1214,4 +1262,336 @@ Result<Namespace> NamespaceFromJson(const 
nlohmann::json& json) {
   return ns;
 }
 
+nlohmann::json ToJson(const TableUpdate& update) {
+  nlohmann::json json;
+  switch (update.kind()) {
+    case TableUpdate::Kind::kAssignUUID: {
+      const auto& u = static_cast<const table::AssignUUID&>(update);
+      json[kAction] = kActionAssignUUID;
+      json[kUUID] = u.uuid();
+      break;
+    }
+    case TableUpdate::Kind::kUpgradeFormatVersion: {
+      const auto& u = static_cast<const table::UpgradeFormatVersion&>(update);
+      json[kAction] = kActionUpgradeFormatVersion;
+      json[kFormatVersion] = u.format_version();
+      break;
+    }
+    case TableUpdate::Kind::kAddSchema: {
+      const auto& u = static_cast<const table::AddSchema&>(update);
+      json[kAction] = kActionAddSchema;
+      json[kSchema] = ToJson(*u.schema());

Review Comment:
   For all pointer types, we need to set `nlohmann::json::value_t::null` if it 
is a required field or skip this field otherwise. Same for below.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -390,6 +396,60 @@ Result<CreateTableRequest> 
CreateTableRequestFromJson(const nlohmann::json& json
   return request;
 }
 
+// CommitTableRequest serialization
+nlohmann::json ToJson(const CommitTableRequest& request) {
+  nlohmann::json json;
+  if (!request.identifier.name.empty()) {
+    json[kIdentifier] = iceberg::ToJson(request.identifier);
+  }
+
+  nlohmann::json requirements_json = nlohmann::json::array();
+  for (const auto& req : request.requirements) {
+    requirements_json.push_back(iceberg::ToJson(*req));
+  }
+  json[kRequirements] = std::move(requirements_json);
+
+  nlohmann::json updates_json = nlohmann::json::array();
+  for (const auto& update : request.updates) {
+    updates_json.push_back(iceberg::ToJson(*update));
+  }
+  json[kUpdates] = std::move(updates_json);
+
+  return json;
+}
+
+Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& 
json) {
+  CommitTableRequest request;
+  if (json.contains(kIdentifier)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
+                            GetJsonValue<nlohmann::json>(json, kIdentifier));
+    ICEBERG_ASSIGN_OR_RAISE(request.identifier, 
TableIdentifierFromJson(identifier_json));
+  }
+  // Note: requirements and updates deserialization would be complex
+  // and is not typically needed for the client side
+  ICEBERG_RETURN_UNEXPECTED(request.Validate());
+  return request;
+}
+
+// CommitTableResponse serialization
+nlohmann::json ToJson(const CommitTableResponse& response) {
+  nlohmann::json json;
+  SetOptionalStringField(json, kMetadataLocation, response.metadata_location);
+  json[kMetadata] = ToJson(*response.metadata);

Review Comment:
   Checking null before referencing it.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -390,6 +396,60 @@ Result<CreateTableRequest> 
CreateTableRequestFromJson(const nlohmann::json& json
   return request;
 }
 
+// CommitTableRequest serialization
+nlohmann::json ToJson(const CommitTableRequest& request) {
+  nlohmann::json json;
+  if (!request.identifier.name.empty()) {
+    json[kIdentifier] = iceberg::ToJson(request.identifier);

Review Comment:
   Can we remove `iceberg::` prefix here and below?



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -244,9 +244,30 @@ Status RestCatalog::UpdateNamespaceProperties(
   return {};
 }
 
-Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
-    [[maybe_unused]] const Namespace& ns) const {
-  return NotImplemented("Not implemented");
+Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& 
ns) const {
+  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
+
+  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
+  std::vector<TableIdentifier> result;
+  std::string next_token;
+  while (true) {
+    std::unordered_map<std::string, std::string> params;
+    if (!next_token.empty()) {
+      params[kQueryParamPageToken] = next_token;
+    }
+    ICEBERG_ASSIGN_OR_RAISE(
+        const auto response,
+        client_->Get(path, params, /*headers=*/{}, 
*TableErrorHandler::Instance()));
+    ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
+    ICEBERG_ASSIGN_OR_RAISE(auto list_response, 
ListTablesResponseFromJson(json));
+    result.insert(result.end(), list_response.identifiers.begin(),
+                  list_response.identifiers.end());
+    if (list_response.next_page_token.empty()) {
+      return result;
+    }
+    next_token = list_response.next_page_token;

Review Comment:
   ```suggestion
       next_token = std::move(list_response.next_page_token);
   ```



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -390,6 +396,60 @@ Result<CreateTableRequest> 
CreateTableRequestFromJson(const nlohmann::json& json
   return request;
 }
 
+// CommitTableRequest serialization
+nlohmann::json ToJson(const CommitTableRequest& request) {
+  nlohmann::json json;
+  if (!request.identifier.name.empty()) {
+    json[kIdentifier] = iceberg::ToJson(request.identifier);
+  }
+
+  nlohmann::json requirements_json = nlohmann::json::array();
+  for (const auto& req : request.requirements) {
+    requirements_json.push_back(iceberg::ToJson(*req));
+  }
+  json[kRequirements] = std::move(requirements_json);
+
+  nlohmann::json updates_json = nlohmann::json::array();
+  for (const auto& update : request.updates) {
+    updates_json.push_back(iceberg::ToJson(*update));
+  }
+  json[kUpdates] = std::move(updates_json);
+
+  return json;
+}
+
+Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& 
json) {
+  CommitTableRequest request;
+  if (json.contains(kIdentifier)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
+                            GetJsonValue<nlohmann::json>(json, kIdentifier));
+    ICEBERG_ASSIGN_OR_RAISE(request.identifier, 
TableIdentifierFromJson(identifier_json));
+  }
+  // Note: requirements and updates deserialization would be complex
+  // and is not typically needed for the client side
+  ICEBERG_RETURN_UNEXPECTED(request.Validate());
+  return request;
+}
+
+// CommitTableResponse serialization
+nlohmann::json ToJson(const CommitTableResponse& response) {
+  nlohmann::json json;
+  SetOptionalStringField(json, kMetadataLocation, response.metadata_location);
+  json[kMetadata] = ToJson(*response.metadata);
+  return json;
+}
+
+Result<CommitTableResponse> CommitTableResponseFromJson(const nlohmann::json& 
json) {
+  CommitTableResponse response;
+  ICEBERG_ASSIGN_OR_RAISE(response.metadata_location,
+                          GetJsonValueOrDefault<std::string>(json, 
kMetadataLocation));

Review Comment:
   ```suggestion
                             GetJsonValue<std::string>(json, 
kMetadataLocation));
   ```
   
   This is a required field.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -280,10 +301,34 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& 
requirements,
-    [[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) 
{
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier,
+    const std::vector<std::unique_ptr<TableRequirement>>& requirements,
+    const std::vector<std::unique_ptr<TableUpdate>>& updates) {
+  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());

Review Comment:
   ditto



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -390,6 +396,60 @@ Result<CreateTableRequest> 
CreateTableRequestFromJson(const nlohmann::json& json
   return request;
 }
 
+// CommitTableRequest serialization
+nlohmann::json ToJson(const CommitTableRequest& request) {
+  nlohmann::json json;
+  if (!request.identifier.name.empty()) {
+    json[kIdentifier] = iceberg::ToJson(request.identifier);
+  }
+
+  nlohmann::json requirements_json = nlohmann::json::array();
+  for (const auto& req : request.requirements) {
+    requirements_json.push_back(iceberg::ToJson(*req));
+  }
+  json[kRequirements] = std::move(requirements_json);
+
+  nlohmann::json updates_json = nlohmann::json::array();
+  for (const auto& update : request.updates) {
+    updates_json.push_back(iceberg::ToJson(*update));
+  }
+  json[kUpdates] = std::move(updates_json);
+
+  return json;
+}
+
+Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& 
json) {
+  CommitTableRequest request;
+  if (json.contains(kIdentifier)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
+                            GetJsonValue<nlohmann::json>(json, kIdentifier));
+    ICEBERG_ASSIGN_OR_RAISE(request.identifier, 
TableIdentifierFromJson(identifier_json));
+  }
+  // Note: requirements and updates deserialization would be complex
+  // and is not typically needed for the client side
+  ICEBERG_RETURN_UNEXPECTED(request.Validate());
+  return request;
+}
+
+// CommitTableResponse serialization
+nlohmann::json ToJson(const CommitTableResponse& response) {
+  nlohmann::json json;
+  SetOptionalStringField(json, kMetadataLocation, response.metadata_location);

Review Comment:
   This field is not optional.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -280,10 +301,34 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& 
requirements,
-    [[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) 
{
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier,
+    const std::vector<std::unique_ptr<TableRequirement>>& requirements,
+    const std::vector<std::unique_ptr<TableUpdate>>& updates) {
+  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
+  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
+
+  // Build request with non-owning shared_ptr (using no-op deleter)

Review Comment:
   This is an anti-pattern. Please add `virtual std::unique_ptr<TableUpdate> 
Clone() const`  and `virtual std::unique_ptr<TableRequirement> Clone() const` 
to achieve this.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -244,9 +244,30 @@ Status RestCatalog::UpdateNamespaceProperties(
   return {};
 }
 
-Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
-    [[maybe_unused]] const Namespace& ns) const {
-  return NotImplemented("Not implemented");
+Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& 
ns) const {
+  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());

Review Comment:
   This is wrong.



##########
src/iceberg/json_internal.h:
##########
@@ -357,4 +359,46 @@ ICEBERG_EXPORT nlohmann::json ToJson(const Namespace& ns);
 /// \return A `Namespace` object or an error if the conversion fails.
 ICEBERG_EXPORT Result<Namespace> NamespaceFromJson(const nlohmann::json& json);
 
+/// \brief Serializes a `TableUpdate` object to JSON.
+///
+/// \param[in] update The `TableUpdate` object to be serialized.

Review Comment:
   Same for all below.



##########
src/iceberg/catalog/rest/json_internal.h:
##########
@@ -56,6 +56,8 @@ ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
 ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
+ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)

Review Comment:
   Please add test cases for all these new functions. It would be good to port 
`TestUpdateTableRequestParser.java`.



##########
src/iceberg/test/rest_catalog_test.cc:
##########
@@ -407,6 +410,49 @@ TEST_F(RestCatalogIntegrationTest, CreateTable) {
               HasErrorMessage("Table already exists: 
test_create_table.apple.ios.t1"));
 }
 
+TEST_F(RestCatalogIntegrationTest, ListTables) {
+  auto catalog_result = CreateCatalog();
+  ASSERT_THAT(catalog_result, IsOk());
+  auto& catalog = catalog_result.value();
+
+  // Create namespace
+  Namespace ns{.levels = {"test_list_tables"}};
+  auto status = catalog->CreateNamespace(ns, {});
+  EXPECT_THAT(status, IsOk());
+
+  // Initially no tables
+  auto list_result = catalog->ListTables(ns);
+  ASSERT_THAT(list_result, IsOk());
+  EXPECT_TRUE(list_result.value().empty());
+
+  // Create tables
+  auto schema = CreateDefaultSchema();
+  auto partition_spec = PartitionSpec::Unpartitioned();
+  auto sort_order = SortOrder::Unsorted();
+  std::unordered_map<std::string, std::string> table_properties;
+
+  TableIdentifier table1{.ns = ns, .name = "table1"};
+  auto create_result = catalog->CreateTable(table1, schema, partition_spec, 
sort_order,
+                                            "", table_properties);
+  ASSERT_THAT(create_result, IsOk());
+
+  TableIdentifier table2{.ns = ns, .name = "table2"};
+  create_result = catalog->CreateTable(table2, schema, partition_spec, 
sort_order, "",
+                                       table_properties);
+  ASSERT_THAT(create_result, IsOk());
+
+  // List and varify tables
+  list_result = catalog->ListTables(ns);
+  ASSERT_THAT(list_result, IsOk());
+  EXPECT_EQ(list_result.value().size(), 2);
+  std::vector<std::string> table_names;
+  for (const auto& id : list_result.value()) {
+    table_names.push_back(id.name);
+  }
+  std::ranges::sort(table_names);
+  EXPECT_EQ(table_names, (std::vector<std::string>{"table1", "table2"}));

Review Comment:
   You can directly use `testing::UnorderedElementsAre` here.



##########
src/iceberg/test/json_internal_test.cc:
##########
@@ -293,4 +295,674 @@ TEST(JsonInternalTest, NameMapping) {
   TestJsonConversion(*mapping, expected_json);
 }
 
+// TableUpdate JSON Serialization/Deserialization Tests
+TEST(TableUpdateJsonTest, AssignUUIDToJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  table::AssignUUID update(uuid);
+  nlohmann::json expected =
+      
R"({"action":"assign-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected) << "AssignUUID should convert to the correct JSON 
value";
+}
+
+TEST(TableUpdateJsonTest, AssignUUIDFromJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  nlohmann::json json =
+      
R"({"action":"assign-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+  table::AssignUUID expected(uuid);
+
+  auto parsed = TableUpdateFromJson(json);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(parsed.value()->kind(), TableUpdate::Kind::kAssignUUID);
+  auto* actual = dynamic_cast<table::AssignUUID*>(parsed.value().get());

Review Comment:
   Replace dynamic_cast with checked_cast, otherwise we need to check 
dynamic_cast does not return null before accessing the value.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -390,6 +396,60 @@ Result<CreateTableRequest> 
CreateTableRequestFromJson(const nlohmann::json& json
   return request;
 }
 
+// CommitTableRequest serialization
+nlohmann::json ToJson(const CommitTableRequest& request) {
+  nlohmann::json json;
+  if (!request.identifier.name.empty()) {
+    json[kIdentifier] = iceberg::ToJson(request.identifier);
+  }
+
+  nlohmann::json requirements_json = nlohmann::json::array();
+  for (const auto& req : request.requirements) {
+    requirements_json.push_back(iceberg::ToJson(*req));
+  }
+  json[kRequirements] = std::move(requirements_json);
+
+  nlohmann::json updates_json = nlohmann::json::array();
+  for (const auto& update : request.updates) {
+    updates_json.push_back(iceberg::ToJson(*update));
+  }
+  json[kUpdates] = std::move(updates_json);
+
+  return json;
+}
+
+Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& 
json) {
+  CommitTableRequest request;
+  if (json.contains(kIdentifier)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
+                            GetJsonValue<nlohmann::json>(json, kIdentifier));
+    ICEBERG_ASSIGN_OR_RAISE(request.identifier, 
TableIdentifierFromJson(identifier_json));
+  }
+  // Note: requirements and updates deserialization would be complex

Review Comment:
   I would use `NotImplemented` here rather than putting a partially (and 
wrong) result. I think they are implemented already so please complete this 
function.



##########
src/iceberg/test/json_internal_test.cc:
##########
@@ -293,4 +295,674 @@ TEST(JsonInternalTest, NameMapping) {
   TestJsonConversion(*mapping, expected_json);
 }
 
+// TableUpdate JSON Serialization/Deserialization Tests
+TEST(TableUpdateJsonTest, AssignUUIDToJson) {
+  std::string uuid = "550e8400-e29b-41d4-a716-446655440000";
+  table::AssignUUID update(uuid);
+  nlohmann::json expected =
+      
R"({"action":"assign-uuid","uuid":"550e8400-e29b-41d4-a716-446655440000"})"_json;
+
+  auto json = ToJson(update);
+  EXPECT_EQ(json, expected) << "AssignUUID should convert to the correct JSON 
value";
+}
+
+TEST(TableUpdateJsonTest, AssignUUIDFromJson) {

Review Comment:
   Please combine test ToJson and FromJson cases of the same object just like 
other cases in this file. Then you can directly use `operator==` to compare 
values instead of manually writing `EXPECT_EQ` and make the tests shorter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to