This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fb7dead8 chore: refactor Transaction::Apply to use separate smaller
functions (#527)
fb7dead8 is described below
commit fb7dead8a6a3db5e5b61fe5cc51df90616416d5a
Author: Junwang Zhao <[email protected]>
AuthorDate: Mon Jan 26 14:05:19 2026 +0800
chore: refactor Transaction::Apply to use separate smaller functions (#527)
---
src/iceberg/transaction.cc | 287 ++++++++++++++++++++++++++-------------------
src/iceberg/transaction.h | 12 ++
src/iceberg/util/macros.h | 12 +-
3 files changed, 186 insertions(+), 125 deletions(-)
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 99b766a4..04ccdfb9 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -102,125 +102,46 @@ Status Transaction::AddUpdate(const
std::shared_ptr<PendingUpdate>& update) {
Status Transaction::Apply(PendingUpdate& update) {
switch (update.kind()) {
- case PendingUpdate::Kind::kExpireSnapshots: {
- auto& expire_snapshots =
internal::checked_cast<ExpireSnapshots&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
- if (!result.snapshot_ids_to_remove.empty()) {
-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
- }
- if (!result.refs_to_remove.empty()) {
- for (const auto& ref_name : result.refs_to_remove) {
- metadata_builder_->RemoveRef(ref_name);
- }
- }
- if (!result.partition_spec_ids_to_remove.empty()) {
- metadata_builder_->RemovePartitionSpecs(
- std::move(result.partition_spec_ids_to_remove));
- }
- if (!result.schema_ids_to_remove.empty()) {
-
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
- }
- } break;
- case PendingUpdate::Kind::kSetSnapshot: {
- auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
- metadata_builder_->SetBranchSnapshot(snapshot_id,
-
std::string(SnapshotRef::kMainBranch));
- } break;
- case PendingUpdate::Kind::kUpdateLocation: {
- auto& update_location = internal::checked_cast<UpdateLocation&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
- metadata_builder_->SetLocation(location);
- } break;
- case PendingUpdate::Kind::kUpdatePartitionSpec: {
- auto& update_partition_spec =
internal::checked_cast<UpdatePartitionSpec&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_partition_spec.Apply());
- if (result.set_as_default) {
- metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
- } else {
- metadata_builder_->AddPartitionSpec(std::move(result.spec));
- }
- } break;
- case PendingUpdate::Kind::kUpdateProperties: {
- auto& update_properties =
internal::checked_cast<UpdateProperties&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
- if (!result.updates.empty()) {
- metadata_builder_->SetProperties(std::move(result.updates));
- }
- if (!result.removals.empty()) {
- metadata_builder_->RemoveProperties(std::move(result.removals));
- }
- if (result.format_version.has_value()) {
- metadata_builder_->UpgradeFormatVersion(result.format_version.value());
- }
- } break;
- case PendingUpdate::Kind::kUpdateSchema: {
- auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
- metadata_builder_->SetCurrentSchema(std::move(result.schema),
- result.new_last_column_id);
- } break;
- case PendingUpdate::Kind::kUpdateSnapshot: {
- const auto& base = metadata_builder_->current();
-
- auto& update_snapshot = internal::checked_cast<SnapshotUpdate&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
-
- // Create a temp builder to check if this is an empty update
- auto temp_update = TableMetadataBuilder::BuildFrom(&base);
- if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
- // This is a rollback operation
- temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
- result.target_branch);
- } else if (result.stage_only) {
- temp_update->AddSnapshot(result.snapshot);
- } else {
- temp_update->SetBranchSnapshot(std::move(result.snapshot),
result.target_branch);
- }
-
- if (temp_update->changes().empty()) {
- // Do not commit if the metadata has not changed. for example, this
may happen
- // when setting the current snapshot to an ID that is already current.
note that
- // this check uses identity.
- return {};
- }
-
- for (const auto& change : temp_update->changes()) {
- change->ApplyTo(*metadata_builder_);
- }
-
- // If the table UUID is missing, add it here. the UUID will be
re-created each time
- // this operation retries to ensure that if a concurrent operation
assigns the UUID,
- // this operation will not fail.
- if (base.table_uuid.empty()) {
- metadata_builder_->AssignUUID();
- }
- } break;
- case PendingUpdate::Kind::kUpdateSnapshotReference: {
- auto& update_ref =
internal::checked_cast<UpdateSnapshotReference&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
- for (const auto& name : result.to_remove) {
- metadata_builder_->RemoveRef(name);
- }
- for (auto&& [name, ref] : result.to_set) {
- metadata_builder_->SetRef(std::move(name), std::move(ref));
- }
- } break;
- case PendingUpdate::Kind::kUpdateSortOrder: {
- auto& update_sort_order =
internal::checked_cast<UpdateSortOrder&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
- metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
- } break;
- case PendingUpdate::Kind::kUpdateStatistics: {
- auto& update_statistics =
internal::checked_cast<UpdateStatistics&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
- for (auto&& [_, stat_file] : result.to_set) {
- metadata_builder_->SetStatistics(std::move(stat_file));
- }
- for (const auto& snapshot_id : result.to_remove) {
- metadata_builder_->RemoveStatistics(snapshot_id);
- }
- } break;
+ case PendingUpdate::Kind::kExpireSnapshots:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyExpireSnapshots(internal::checked_cast<ExpireSnapshots&>(update)));
+ break;
+ case PendingUpdate::Kind::kSetSnapshot:
+ ICEBERG_RETURN_UNEXPECTED(
+ ApplySetSnapshot(internal::checked_cast<SetSnapshot&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateLocation:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdateLocation(internal::checked_cast<UpdateLocation&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdatePartitionSpec:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdatePartitionSpec(internal::checked_cast<UpdatePartitionSpec&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateProperties:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdateProperties(internal::checked_cast<UpdateProperties&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateSchema:
+ ICEBERG_RETURN_UNEXPECTED(
+ ApplyUpdateSchema(internal::checked_cast<UpdateSchema&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateSnapshot:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdateSnapshot(internal::checked_cast<SnapshotUpdate&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateSnapshotReference:
+ ICEBERG_RETURN_UNEXPECTED(ApplyUpdateSnapshotReference(
+ internal::checked_cast<UpdateSnapshotReference&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateSortOrder:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdateSortOrder(internal::checked_cast<UpdateSortOrder&>(update)));
+ break;
+ case PendingUpdate::Kind::kUpdateStatistics:
+ ICEBERG_RETURN_UNEXPECTED(
+
ApplyUpdateStatistics(internal::checked_cast<UpdateStatistics&>(update)));
+ break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -235,6 +156,134 @@ Status Transaction::Apply(PendingUpdate& update) {
return {};
}
+Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ if (!result.snapshot_ids_to_remove.empty()) {
+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+ }
+ if (!result.refs_to_remove.empty()) {
+ for (const auto& ref_name : result.refs_to_remove) {
+ metadata_builder_->RemoveRef(ref_name);
+ }
+ }
+ if (!result.partition_spec_ids_to_remove.empty()) {
+ metadata_builder_->RemovePartitionSpecs(
+ std::move(result.partition_spec_ids_to_remove));
+ }
+ if (!result.schema_ids_to_remove.empty()) {
+ metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+ }
+ return {};
+}
+
+Status Transaction::ApplySetSnapshot(SetSnapshot& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply());
+ metadata_builder_->SetBranchSnapshot(snapshot_id,
+ std::string(SnapshotRef::kMainBranch));
+ return {};
+}
+
+Status Transaction::ApplyUpdateLocation(UpdateLocation& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply());
+ metadata_builder_->SetLocation(location);
+ return {};
+}
+
+Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ if (result.set_as_default) {
+ metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
+ } else {
+ metadata_builder_->AddPartitionSpec(std::move(result.spec));
+ }
+ return {};
+}
+
+Status Transaction::ApplyUpdateProperties(UpdateProperties& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ if (!result.updates.empty()) {
+ metadata_builder_->SetProperties(std::move(result.updates));
+ }
+ if (!result.removals.empty()) {
+ metadata_builder_->RemoveProperties(std::move(result.removals));
+ }
+ if (result.format_version.has_value()) {
+ metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+ }
+ return {};
+}
+
+Status Transaction::ApplyUpdateSchema(UpdateSchema& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ metadata_builder_->SetCurrentSchema(std::move(result.schema),
+ result.new_last_column_id);
+ return {};
+}
+
+Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) {
+ const auto& base = metadata_builder_->current();
+
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+
+ // Create a temp builder to check if this is an empty update
+ auto temp_update = TableMetadataBuilder::BuildFrom(&base);
+ if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
+ // This is a rollback operation
+ temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
result.target_branch);
+ } else if (result.stage_only) {
+ temp_update->AddSnapshot(result.snapshot);
+ } else {
+ temp_update->SetBranchSnapshot(std::move(result.snapshot),
result.target_branch);
+ }
+
+ if (temp_update->changes().empty()) {
+ // Do not commit if the metadata has not changed. for example, this may
happen
+ // when setting the current snapshot to an ID that is already current.
note that
+ // this check uses identity.
+ return {};
+ }
+
+ for (const auto& change : temp_update->changes()) {
+ change->ApplyTo(*metadata_builder_);
+ }
+
+ // If the table UUID is missing, add it here. the UUID will be re-created
each time
+ // this operation retries to ensure that if a concurrent operation assigns
the UUID,
+ // this operation will not fail.
+ if (base.table_uuid.empty()) {
+ metadata_builder_->AssignUUID();
+ }
+ return {};
+}
+
+Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference&
update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ for (const auto& name : result.to_remove) {
+ metadata_builder_->RemoveRef(name);
+ }
+ for (auto&& [name, ref] : result.to_set) {
+ metadata_builder_->SetRef(std::move(name), std::move(ref));
+ }
+ return {};
+}
+
+Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply());
+ metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+ return {};
+}
+
+Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ for (auto&& [_, stat_file] : result.to_set) {
+ metadata_builder_->SetStatistics(std::move(stat_file));
+ }
+ for (const auto& snapshot_id : result.to_remove) {
+ metadata_builder_->RemoveStatistics(snapshot_id);
+ }
+ return {};
+}
+
Result<std::shared_ptr<Table>> Transaction::Commit() {
if (committed_) {
return Invalid("Transaction already committed");
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index f7110db8..3d5450e5 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -110,6 +110,18 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// \brief Apply the pending changes to current table.
Status Apply(PendingUpdate& updates);
+ // Helper methods for applying different types of updates
+ Status ApplyExpireSnapshots(ExpireSnapshots& update);
+ Status ApplySetSnapshot(SetSnapshot& update);
+ Status ApplyUpdateLocation(UpdateLocation& update);
+ Status ApplyUpdatePartitionSpec(UpdatePartitionSpec& update);
+ Status ApplyUpdateProperties(UpdateProperties& update);
+ Status ApplyUpdateSchema(UpdateSchema& update);
+ Status ApplyUpdateSnapshot(SnapshotUpdate& update);
+ Status ApplyUpdateSnapshotReference(UpdateSnapshotReference& update);
+ Status ApplyUpdateSortOrder(UpdateSortOrder& update);
+ Status ApplyUpdateStatistics(UpdateStatistics& update);
+
private:
friend class PendingUpdate;
diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h
index 9c80e585..e391d2bf 100644
--- a/src/iceberg/util/macros.h
+++ b/src/iceberg/util/macros.h
@@ -24,9 +24,9 @@
#include "iceberg/exception.h"
#include "iceberg/result.h"
-#define ICEBERG_RETURN_UNEXPECTED(result) \
- if (auto&& result_name = result; !result_name) [[unlikely]] { \
- return std::unexpected<Error>(result_name.error()); \
+#define ICEBERG_RETURN_UNEXPECTED(expr) \
+ if (auto&& result_name = expr; !result_name) [[unlikely]] { \
+ return std::unexpected<Error>(result_name.error()); \
}
#define ICEBERG_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \
@@ -68,9 +68,9 @@
throw iceberg::IcebergError(error.message); \
}
-#define ICEBERG_THROW_NOT_OK(result) \
- if (auto&& result_name = result; !result_name) [[unlikely]] { \
- ERROR_TO_EXCEPTION(result_name.error()); \
+#define ICEBERG_THROW_NOT_OK(expr) \
+ if (auto&& result_name = expr; !result_name) [[unlikely]] { \
+ ERROR_TO_EXCEPTION(result_name.error()); \
}
#define ICEBERG_ASSIGN_OR_THROW_IMPL(result_name, lhs, rexpr) \