This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fb7dead8 chore: refactor Transaction::Apply to use separate smaller 
functions (#527)
fb7dead8 is described below

commit fb7dead8a6a3db5e5b61fe5cc51df90616416d5a
Author: Junwang Zhao <[email protected]>
AuthorDate: Mon Jan 26 14:05:19 2026 +0800

    chore: refactor Transaction::Apply to use separate smaller functions (#527)
---
 src/iceberg/transaction.cc | 287 ++++++++++++++++++++++++++-------------------
 src/iceberg/transaction.h  |  12 ++
 src/iceberg/util/macros.h  |  12 +-
 3 files changed, 186 insertions(+), 125 deletions(-)

diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 99b766a4..04ccdfb9 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -102,125 +102,46 @@ Status Transaction::AddUpdate(const 
std::shared_ptr<PendingUpdate>& update) {
 
 Status Transaction::Apply(PendingUpdate& update) {
   switch (update.kind()) {
-    case PendingUpdate::Kind::kExpireSnapshots: {
-      auto& expire_snapshots = 
internal::checked_cast<ExpireSnapshots&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
-      if (!result.snapshot_ids_to_remove.empty()) {
-        
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
-      }
-      if (!result.refs_to_remove.empty()) {
-        for (const auto& ref_name : result.refs_to_remove) {
-          metadata_builder_->RemoveRef(ref_name);
-        }
-      }
-      if (!result.partition_spec_ids_to_remove.empty()) {
-        metadata_builder_->RemovePartitionSpecs(
-            std::move(result.partition_spec_ids_to_remove));
-      }
-      if (!result.schema_ids_to_remove.empty()) {
-        
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
-      }
-    } break;
-    case PendingUpdate::Kind::kSetSnapshot: {
-      auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
-      metadata_builder_->SetBranchSnapshot(snapshot_id,
-                                           
std::string(SnapshotRef::kMainBranch));
-    } break;
-    case PendingUpdate::Kind::kUpdateLocation: {
-      auto& update_location = internal::checked_cast<UpdateLocation&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
-      metadata_builder_->SetLocation(location);
-    } break;
-    case PendingUpdate::Kind::kUpdatePartitionSpec: {
-      auto& update_partition_spec = 
internal::checked_cast<UpdatePartitionSpec&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_partition_spec.Apply());
-      if (result.set_as_default) {
-        metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
-      } else {
-        metadata_builder_->AddPartitionSpec(std::move(result.spec));
-      }
-    } break;
-    case PendingUpdate::Kind::kUpdateProperties: {
-      auto& update_properties = 
internal::checked_cast<UpdateProperties&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
-      if (!result.updates.empty()) {
-        metadata_builder_->SetProperties(std::move(result.updates));
-      }
-      if (!result.removals.empty()) {
-        metadata_builder_->RemoveProperties(std::move(result.removals));
-      }
-      if (result.format_version.has_value()) {
-        metadata_builder_->UpgradeFormatVersion(result.format_version.value());
-      }
-    } break;
-    case PendingUpdate::Kind::kUpdateSchema: {
-      auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
-      metadata_builder_->SetCurrentSchema(std::move(result.schema),
-                                          result.new_last_column_id);
-    } break;
-    case PendingUpdate::Kind::kUpdateSnapshot: {
-      const auto& base = metadata_builder_->current();
-
-      auto& update_snapshot = internal::checked_cast<SnapshotUpdate&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
-
-      // Create a temp builder to check if this is an empty update
-      auto temp_update = TableMetadataBuilder::BuildFrom(&base);
-      if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
-        // This is a rollback operation
-        temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
-                                       result.target_branch);
-      } else if (result.stage_only) {
-        temp_update->AddSnapshot(result.snapshot);
-      } else {
-        temp_update->SetBranchSnapshot(std::move(result.snapshot), 
result.target_branch);
-      }
-
-      if (temp_update->changes().empty()) {
-        // Do not commit if the metadata has not changed. for example, this 
may happen
-        // when setting the current snapshot to an ID that is already current. 
note that
-        // this check uses identity.
-        return {};
-      }
-
-      for (const auto& change : temp_update->changes()) {
-        change->ApplyTo(*metadata_builder_);
-      }
-
-      // If the table UUID is missing, add it here. the UUID will be 
re-created each time
-      // this operation retries to ensure that if a concurrent operation 
assigns the UUID,
-      // this operation will not fail.
-      if (base.table_uuid.empty()) {
-        metadata_builder_->AssignUUID();
-      }
-    } break;
-    case PendingUpdate::Kind::kUpdateSnapshotReference: {
-      auto& update_ref = 
internal::checked_cast<UpdateSnapshotReference&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
-      for (const auto& name : result.to_remove) {
-        metadata_builder_->RemoveRef(name);
-      }
-      for (auto&& [name, ref] : result.to_set) {
-        metadata_builder_->SetRef(std::move(name), std::move(ref));
-      }
-    } break;
-    case PendingUpdate::Kind::kUpdateSortOrder: {
-      auto& update_sort_order = 
internal::checked_cast<UpdateSortOrder&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
-      metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
-    } break;
-    case PendingUpdate::Kind::kUpdateStatistics: {
-      auto& update_statistics = 
internal::checked_cast<UpdateStatistics&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
-      for (auto&& [_, stat_file] : result.to_set) {
-        metadata_builder_->SetStatistics(std::move(stat_file));
-      }
-      for (const auto& snapshot_id : result.to_remove) {
-        metadata_builder_->RemoveStatistics(snapshot_id);
-      }
-    } break;
+    case PendingUpdate::Kind::kExpireSnapshots:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyExpireSnapshots(internal::checked_cast<ExpireSnapshots&>(update)));
+      break;
+    case PendingUpdate::Kind::kSetSnapshot:
+      ICEBERG_RETURN_UNEXPECTED(
+          ApplySetSnapshot(internal::checked_cast<SetSnapshot&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateLocation:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdateLocation(internal::checked_cast<UpdateLocation&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdatePartitionSpec:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdatePartitionSpec(internal::checked_cast<UpdatePartitionSpec&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateProperties:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdateProperties(internal::checked_cast<UpdateProperties&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateSchema:
+      ICEBERG_RETURN_UNEXPECTED(
+          ApplyUpdateSchema(internal::checked_cast<UpdateSchema&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateSnapshot:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdateSnapshot(internal::checked_cast<SnapshotUpdate&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateSnapshotReference:
+      ICEBERG_RETURN_UNEXPECTED(ApplyUpdateSnapshotReference(
+          internal::checked_cast<UpdateSnapshotReference&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateSortOrder:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdateSortOrder(internal::checked_cast<UpdateSortOrder&>(update)));
+      break;
+    case PendingUpdate::Kind::kUpdateStatistics:
+      ICEBERG_RETURN_UNEXPECTED(
+          
ApplyUpdateStatistics(internal::checked_cast<UpdateStatistics&>(update)));
+      break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -235,6 +156,134 @@ Status Transaction::Apply(PendingUpdate& update) {
   return {};
 }
 
+Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  if (!result.snapshot_ids_to_remove.empty()) {
+    
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+  }
+  if (!result.refs_to_remove.empty()) {
+    for (const auto& ref_name : result.refs_to_remove) {
+      metadata_builder_->RemoveRef(ref_name);
+    }
+  }
+  if (!result.partition_spec_ids_to_remove.empty()) {
+    metadata_builder_->RemovePartitionSpecs(
+        std::move(result.partition_spec_ids_to_remove));
+  }
+  if (!result.schema_ids_to_remove.empty()) {
+    metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+  }
+  return {};
+}
+
+Status Transaction::ApplySetSnapshot(SetSnapshot& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply());
+  metadata_builder_->SetBranchSnapshot(snapshot_id,
+                                       std::string(SnapshotRef::kMainBranch));
+  return {};
+}
+
+Status Transaction::ApplyUpdateLocation(UpdateLocation& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply());
+  metadata_builder_->SetLocation(location);
+  return {};
+}
+
+Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  if (result.set_as_default) {
+    metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
+  } else {
+    metadata_builder_->AddPartitionSpec(std::move(result.spec));
+  }
+  return {};
+}
+
+Status Transaction::ApplyUpdateProperties(UpdateProperties& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  if (!result.updates.empty()) {
+    metadata_builder_->SetProperties(std::move(result.updates));
+  }
+  if (!result.removals.empty()) {
+    metadata_builder_->RemoveProperties(std::move(result.removals));
+  }
+  if (result.format_version.has_value()) {
+    metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+  }
+  return {};
+}
+
+Status Transaction::ApplyUpdateSchema(UpdateSchema& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  metadata_builder_->SetCurrentSchema(std::move(result.schema),
+                                      result.new_last_column_id);
+  return {};
+}
+
+Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) {
+  const auto& base = metadata_builder_->current();
+
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+
+  // Create a temp builder to check if this is an empty update
+  auto temp_update = TableMetadataBuilder::BuildFrom(&base);
+  if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
+    // This is a rollback operation
+    temp_update->SetBranchSnapshot(result.snapshot->snapshot_id, 
result.target_branch);
+  } else if (result.stage_only) {
+    temp_update->AddSnapshot(result.snapshot);
+  } else {
+    temp_update->SetBranchSnapshot(std::move(result.snapshot), 
result.target_branch);
+  }
+
+  if (temp_update->changes().empty()) {
+    // Do not commit if the metadata has not changed. for example, this may 
happen
+    // when setting the current snapshot to an ID that is already current. 
note that
+    // this check uses identity.
+    return {};
+  }
+
+  for (const auto& change : temp_update->changes()) {
+    change->ApplyTo(*metadata_builder_);
+  }
+
+  // If the table UUID is missing, add it here. the UUID will be re-created 
each time
+  // this operation retries to ensure that if a concurrent operation assigns 
the UUID,
+  // this operation will not fail.
+  if (base.table_uuid.empty()) {
+    metadata_builder_->AssignUUID();
+  }
+  return {};
+}
+
+Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference& 
update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  for (const auto& name : result.to_remove) {
+    metadata_builder_->RemoveRef(name);
+  }
+  for (auto&& [name, ref] : result.to_set) {
+    metadata_builder_->SetRef(std::move(name), std::move(ref));
+  }
+  return {};
+}
+
+Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply());
+  metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+  return {};
+}
+
+Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  for (auto&& [_, stat_file] : result.to_set) {
+    metadata_builder_->SetStatistics(std::move(stat_file));
+  }
+  for (const auto& snapshot_id : result.to_remove) {
+    metadata_builder_->RemoveStatistics(snapshot_id);
+  }
+  return {};
+}
+
 Result<std::shared_ptr<Table>> Transaction::Commit() {
   if (committed_) {
     return Invalid("Transaction already committed");
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index f7110db8..3d5450e5 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -110,6 +110,18 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// \brief Apply the pending changes to current table.
   Status Apply(PendingUpdate& updates);
 
+  // Helper methods for applying different types of updates
+  Status ApplyExpireSnapshots(ExpireSnapshots& update);
+  Status ApplySetSnapshot(SetSnapshot& update);
+  Status ApplyUpdateLocation(UpdateLocation& update);
+  Status ApplyUpdatePartitionSpec(UpdatePartitionSpec& update);
+  Status ApplyUpdateProperties(UpdateProperties& update);
+  Status ApplyUpdateSchema(UpdateSchema& update);
+  Status ApplyUpdateSnapshot(SnapshotUpdate& update);
+  Status ApplyUpdateSnapshotReference(UpdateSnapshotReference& update);
+  Status ApplyUpdateSortOrder(UpdateSortOrder& update);
+  Status ApplyUpdateStatistics(UpdateStatistics& update);
+
  private:
   friend class PendingUpdate;
 
diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h
index 9c80e585..e391d2bf 100644
--- a/src/iceberg/util/macros.h
+++ b/src/iceberg/util/macros.h
@@ -24,9 +24,9 @@
 #include "iceberg/exception.h"
 #include "iceberg/result.h"
 
-#define ICEBERG_RETURN_UNEXPECTED(result)                       \
-  if (auto&& result_name = result; !result_name) [[unlikely]] { \
-    return std::unexpected<Error>(result_name.error());         \
+#define ICEBERG_RETURN_UNEXPECTED(expr)                       \
+  if (auto&& result_name = expr; !result_name) [[unlikely]] { \
+    return std::unexpected<Error>(result_name.error());       \
   }
 
 #define ICEBERG_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \
@@ -68,9 +68,9 @@
     throw iceberg::IcebergError(error.message);               \
   }
 
-#define ICEBERG_THROW_NOT_OK(result)                            \
-  if (auto&& result_name = result; !result_name) [[unlikely]] { \
-    ERROR_TO_EXCEPTION(result_name.error());                    \
+#define ICEBERG_THROW_NOT_OK(expr)                            \
+  if (auto&& result_name = expr; !result_name) [[unlikely]] { \
+    ERROR_TO_EXCEPTION(result_name.error());                  \
   }
 
 #define ICEBERG_ASSIGN_OR_THROW_IMPL(result_name, lhs, rexpr) \

Reply via email to