This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d1807682 feat: add UpdateSnapshotReference (#512)
d1807682 is described below
commit d1807682fc8de9ec8aa9d2206c65a0071501bb6f
Author: Junwang Zhao <[email protected]>
AuthorDate: Thu Jan 22 15:11:24 2026 +0800
feat: add UpdateSnapshotReference (#512)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/parquet/parquet_data_util.cc | 2 +-
src/iceberg/transaction.cc | 29 ++-
src/iceberg/transaction.h | 4 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/pending_update.h | 1 +
src/iceberg/update/update_snapshot_reference.cc | 244 ++++++++++++++++++++++++
src/iceberg/update/update_snapshot_reference.h | 157 +++++++++++++++
src/iceberg/util/snapshot_util.cc | 11 ++
src/iceberg/util/snapshot_util_internal.h | 11 ++
12 files changed, 457 insertions(+), 6 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 788b903f..359b79e6 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -94,6 +94,7 @@ set(ICEBERG_SOURCES
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
+ update/update_snapshot_reference.cc
update/update_sort_order.cc
update/update_statistics.cc
util/bucket_util.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index a1f88b36..05cb6f8d 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -112,6 +112,7 @@ iceberg_sources = files(
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
+ 'update/update_snapshot_reference.cc',
'update/update_sort_order.cc',
'update/update_statistics.cc',
'util/bucket_util.cc',
diff --git a/src/iceberg/parquet/parquet_data_util.cc
b/src/iceberg/parquet/parquet_data_util.cc
index 14d20ff9..43efd1cb 100644
--- a/src/iceberg/parquet/parquet_data_util.cc
+++ b/src/iceberg/parquet/parquet_data_util.cc
@@ -148,7 +148,7 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
return output_array;
}
-/// Templated implementation for projecting list arrays.
+/// \brief Templated implementation for projecting list arrays.
/// Works with both ListArray/ListType (32-bit offsets) and
/// LargeListArray/LargeListType (64-bit offsets).
template <typename ArrowListArrayType, typename ArrowListType>
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 7bd4a577..99b766a4 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -41,6 +41,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
+#include "iceberg/update/update_snapshot_reference.h"
#include "iceberg/update/update_sort_order.h"
#include "iceberg/update/update_statistics.h"
#include "iceberg/util/checked_cast.h"
@@ -159,11 +160,6 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->SetCurrentSchema(std::move(result.schema),
result.new_last_column_id);
} break;
- case PendingUpdate::Kind::kUpdateSortOrder: {
- auto& update_sort_order =
internal::checked_cast<UpdateSortOrder&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
- metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
- } break;
case PendingUpdate::Kind::kUpdateSnapshot: {
const auto& base = metadata_builder_->current();
@@ -200,6 +196,21 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
+ case PendingUpdate::Kind::kUpdateSnapshotReference: {
+ auto& update_ref =
internal::checked_cast<UpdateSnapshotReference&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
+ for (const auto& name : result.to_remove) {
+ metadata_builder_->RemoveRef(name);
+ }
+ for (auto&& [name, ref] : result.to_set) {
+ metadata_builder_->SetRef(std::move(name), std::move(ref));
+ }
+ } break;
+ case PendingUpdate::Kind::kUpdateSortOrder: {
+ auto& update_sort_order =
internal::checked_cast<UpdateSortOrder&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
+ metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+ } break;
case PendingUpdate::Kind::kUpdateStatistics: {
auto& update_statistics =
internal::checked_cast<UpdateStatistics&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
@@ -335,4 +346,12 @@ Result<std::shared_ptr<UpdateStatistics>>
Transaction::NewUpdateStatistics() {
return update_statistics;
}
+Result<std::shared_ptr<UpdateSnapshotReference>>
+Transaction::NewUpdateSnapshotReference() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
+ UpdateSnapshotReference::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
+ return update_ref;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 6d7816ee..f7110db8 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -97,6 +97,10 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// \brief Create a new FastAppend to append data files and commit the
changes.
Result<std::shared_ptr<FastAppend>> NewFastAppend();
+ /// \brief Create a new UpdateSnapshotReference to update snapshot
references (branches
+ /// and tags) and commit the changes.
+ Result<std::shared_ptr<UpdateSnapshotReference>>
NewUpdateSnapshotReference();
+
private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 2e099c1b..9e21088f 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -198,6 +198,7 @@ class UpdateLocation;
class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
+class UpdateSnapshotReference;
class UpdateSortOrder;
class UpdateStatistics;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 8dc92c00..e00b1e6e 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -25,6 +25,7 @@ install_headers(
'update_location.h',
'update_partition_spec.h',
'update_schema.h',
+ 'update_snapshot_reference.h',
'update_sort_order.h',
'update_properties.h',
'update_statistics.h',
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index 0c0b6e3e..e5d583d1 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -49,6 +49,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdateProperties,
kUpdateSchema,
kUpdateSnapshot,
+ kUpdateSnapshotReference,
kUpdateSortOrder,
kUpdateStatistics,
};
diff --git a/src/iceberg/update/update_snapshot_reference.cc
b/src/iceberg/update/update_snapshot_reference.cc
new file mode 100644
index 00000000..923f0c8d
--- /dev/null
+++ b/src/iceberg/update/update_snapshot_reference.cc
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_snapshot_reference.h"
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create UpdateSnapshotReference without a
transaction");
+ return std::shared_ptr<UpdateSnapshotReference>(
+ new UpdateSnapshotReference(std::move(transaction)));
+}
+
+UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction>
transaction)
+ : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {}
+
+UpdateSnapshotReference::~UpdateSnapshotReference() = default;
+
+UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const
std::string& name,
+ int64_t
snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch,
SnapshotRef::MakeBranch(snapshot_id));
+ auto [_, inserted] = updated_refs_.emplace(name, std::move(branch));
+ ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string&
name,
+ int64_t
snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag,
SnapshotRef::MakeTag(snapshot_id));
+ auto [_, inserted] = updated_refs_.emplace(name, std::move(tag));
+ ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const
std::string& name) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
+ ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main
branch");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist:
{}", name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", name);
+ updated_refs_.erase(it);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string&
name) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}",
name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
+ "Ref '{}' is a branch not a tag", name);
+ updated_refs_.erase(it);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch(
+ const std::string& name, const std::string& new_name) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty");
+ ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty");
+ ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main
branch");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist:
{}", name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", name);
+ auto [_, inserted] = updated_refs_.emplace(new_name, it->second);
+ ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name);
+ updated_refs_.erase(it);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const
std::string& name,
+ int64_t
snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist:
{}", name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", name);
+ it->second = it->second->Clone(snapshot_id);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const
std::string& from,
+ const
std::string& to) {
+ return ReplaceBranchInternal(from, to, /*fast_forward=*/false);
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const
std::string& from,
+ const
std::string& to) {
+ return ReplaceBranchInternal(from, to, /*fast_forward=*/true);
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
+ const std::string& from, const std::string& to, bool fast_forward) {
+ ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty");
+ ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty");
+ auto to_it = updated_refs_.find(to);
+ ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist:
{}", to);
+
+ auto from_it = updated_refs_.find(from);
+ if (from_it == updated_refs_.end()) {
+ return CreateBranch(from, to_it->second->snapshot_id);
+ }
+
+ ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", from);
+
+ // Nothing to replace if snapshot IDs are the same
+ if (to_it->second->snapshot_id == from_it->second->snapshot_id) {
+ return *this;
+ }
+
+ if (fast_forward) {
+ // Fast-forward is valid only when the current branch (from) is an
ancestor of the
+ // target (to), i.e. we are moving forward in history.
+ const auto& base_metadata = transaction_->current();
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(
+ auto from_is_ancestor_of_to,
+ SnapshotUtil::IsAncestorOf(
+ to_it->second->snapshot_id, from_it->second->snapshot_id,
+ [&base_metadata](int64_t id) { return
base_metadata.SnapshotById(id); }));
+
+ ICEBERG_BUILDER_CHECK(from_is_ancestor_of_to,
+ "Cannot fast-forward: {} is not an ancestor of {}",
from, to);
+ }
+
+ from_it->second = from_it->second->Clone(to_it->second->snapshot_id);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const
std::string& name,
+ int64_t
snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}",
name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
+ "Ref '{}' is a branch not a tag", name);
+ it->second = it->second->Clone(snapshot_id);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
+ const std::string& name, int32_t min_snapshots_to_keep) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist:
{}", name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", name);
+ it->second = it->second->Clone();
+ std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
+ min_snapshots_to_keep;
+ ICEBERG_BUILDER_CHECK(it->second->Validate(),
+ "Invalid min_snapshots_to_keep {} for branch '{}'",
+ min_snapshots_to_keep, name);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
+ const std::string& name, int64_t max_snapshot_age_ms) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist:
{}", name);
+ ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
+ "Ref '{}' is a tag not a branch", name);
+ it->second = it->second->Clone();
+ std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
+ max_snapshot_age_ms;
+ ICEBERG_BUILDER_CHECK(it->second->Validate(),
+ "Invalid max_snapshot_age_ms {} for branch '{}'",
+ max_snapshot_age_ms, name);
+ return *this;
+}
+
+UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const
std::string& name,
+ int64_t
max_ref_age_ms) {
+ ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
+ auto it = updated_refs_.find(name);
+ ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}",
name);
+ it->second = it->second->Clone();
+ if (it->second->type() == SnapshotRefType::kBranch) {
+ std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms =
max_ref_age_ms;
+ } else {
+ std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms =
max_ref_age_ms;
+ }
+ ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for
ref '{}'",
+ max_ref_age_ms, name);
+ return *this;
+}
+
+Result<UpdateSnapshotReference::ApplyResult> UpdateSnapshotReference::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ ApplyResult result;
+ const auto& current_refs = base().refs;
+
+ // Identify references which have been removed
+ for (const auto& [name, ref] : current_refs) {
+ if (!updated_refs_.contains(name)) {
+ result.to_remove.push_back(name);
+ }
+ }
+
+ // Identify references which have been created or updated
+ for (const auto& [name, ref] : updated_refs_) {
+ if (auto iter = current_refs.find(name);
+ iter == current_refs.end() || *iter->second != *ref) {
+ result.to_set.emplace_back(name, ref);
+ }
+ }
+
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/update_snapshot_reference.h
b/src/iceberg/update/update_snapshot_reference.h
new file mode 100644
index 00000000..e13f5bfa
--- /dev/null
+++ b/src/iceberg/update/update_snapshot_reference.h
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/update_snapshot_reference.h
+
+namespace iceberg {
+
+/// \brief Updates snapshot references.
+///
+/// TODO(xxx): Add SetSnapshot operations such as SetCurrentSnapshot,
RollBackTime,
+/// RollbackTo to this class so that we can support those operations for refs.
+class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate {
+ public:
+ static Result<std::shared_ptr<UpdateSnapshotReference>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~UpdateSnapshotReference() override;
+
+ /// \brief Create a branch reference.
+ ///
+ /// \param name The branch name
+ /// \param snapshot_id The snapshot ID for the branch
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& CreateBranch(const std::string& name, int64_t
snapshot_id);
+
+ /// \brief Create a tag reference.
+ ///
+ /// \param name The tag name
+ /// \param snapshot_id The snapshot ID for the tag
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& CreateTag(const std::string& name, int64_t
snapshot_id);
+
+ /// \brief Remove a branch reference.
+ ///
+ /// \param name The branch name to remove
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& RemoveBranch(const std::string& name);
+
+ /// \brief Remove a tag reference.
+ ///
+ /// \param name The tag name to remove
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& RemoveTag(const std::string& name);
+
+ /// \brief Rename a branch reference.
+ ///
+ /// \param name The current branch name
+ /// \param new_name The new branch name
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& RenameBranch(const std::string& name,
+ const std::string& new_name);
+
+ /// \brief Replace a branch reference with a new snapshot ID.
+ ///
+ /// \param name The branch name
+ /// \param snapshot_id The new snapshot ID
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& ReplaceBranch(const std::string& name, int64_t
snapshot_id);
+
+ /// \brief Replace a branch reference with another reference's snapshot ID.
+ ///
+ /// \param from The branch name to update
+ /// \param to The reference name to copy the snapshot ID from
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& ReplaceBranch(const std::string& from, const
std::string& to);
+
+ /// \brief Fast-forward a branch to another reference's snapshot ID.
+ ///
+ /// This is similar to ReplaceBranch but validates that the target snapshot
is an
+ /// ancestor of the current branch snapshot.
+ ///
+ /// \param from The branch name to update
+ /// \param to The reference name to copy the snapshot ID from
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& FastForward(const std::string& from, const
std::string& to);
+
+ /// \brief Replace a tag reference with a new snapshot ID.
+ ///
+ /// \param name The tag name
+ /// \param snapshot_id The new snapshot ID
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& ReplaceTag(const std::string& name, int64_t
snapshot_id);
+
+ /// \brief Set the minimum number of snapshots to keep for a branch.
+ ///
+ /// \param name The branch name
+ /// \param min_snapshots_to_keep The minimum number of snapshots to keep
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& SetMinSnapshotsToKeep(const std::string& name,
+ int32_t
min_snapshots_to_keep);
+
+ /// \brief Set the maximum snapshot age in milliseconds for a branch.
+ ///
+ /// \param name The branch name
+ /// \param max_snapshot_age_ms The maximum snapshot age in milliseconds
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& SetMaxSnapshotAgeMs(const std::string& name,
+ int64_t max_snapshot_age_ms);
+
+ /// \brief Set the maximum reference age in milliseconds.
+ ///
+ /// \param name The reference name
+ /// \param max_ref_age_ms The maximum reference age in milliseconds
+ /// \return Reference to this for method chaining
+ UpdateSnapshotReference& SetMaxRefAgeMs(const std::string& name,
+ int64_t max_ref_age_ms);
+
+ Kind kind() const final { return Kind::kUpdateSnapshotReference; }
+
+ struct ApplyResult {
+ /// References to set or update (name, ref pairs)
+ std::vector<std::pair<std::string, std::shared_ptr<SnapshotRef>>> to_set;
+ /// Reference names to remove
+ std::vector<std::string> to_remove;
+ };
+
+ /// \brief Apply the pending changes and return the updated and removed
references.
+ Result<ApplyResult> Apply();
+
+ private:
+ explicit UpdateSnapshotReference(std::shared_ptr<Transaction> transaction);
+
+ UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from,
+ const std::string& to,
+ bool fast_forward);
+
+ std::unordered_map<std::string, std::shared_ptr<SnapshotRef>> updated_refs_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
index d3d5669c..84e7a10b 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -62,6 +62,17 @@ Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
int64_t snapshot_id,
});
}
+Result<bool> SnapshotUtil::IsAncestorOf(
+ int64_t snapshot_id, int64_t ancestor_snapshot_id,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot, lookup(snapshot_id));
+ ICEBERG_CHECK(snapshot != nullptr, "Cannot find snapshot: {}", snapshot_id);
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(snapshot, lookup));
+ return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto&
ancestor) {
+ return ancestor != nullptr && ancestor->snapshot_id ==
ancestor_snapshot_id;
+ });
+}
+
Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
index 0a000c69..ce5e0278 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -62,6 +62,17 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<bool> IsAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_snapshot_id);
+ /// \brief Returns whether ancestor_snapshot_id is an ancestor of
snapshot_id using the
+ /// given lookup function.
+ ///
+ /// \param snapshot_id The snapshot ID to check
+ /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+ /// \param lookup Function to lookup snapshots by ID
+ /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id
+ static Result<bool> IsAncestorOf(
+ int64_t snapshot_id, int64_t ancestor_snapshot_id,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);
+
/// \brief Returns whether ancestor_snapshot_id is an ancestor of the
table's current
/// state.
///