This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b937acb6 feat: support expire snapshots (#490)
b937acb6 is described below
commit b937acb6812dca085e7b64a034e052607b92ef92
Author: dongxiao <[email protected]>
AuthorDate: Wed Jan 14 15:22:46 2026 +0800
feat: support expire snapshots (#490)
This PR is part of effort to implement expire snapshots described in
the issue https://github.com/apache/iceberg-cpp/issues/364.
TODO: File recycling will be added in a followup PR.
---------
Co-authored-by: Gang Wu <[email protected]>
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/snapshot.cc | 13 ++
src/iceberg/snapshot.h | 2 +
src/iceberg/table.cc | 8 +
src/iceberg/table.h | 4 +
src/iceberg/table_metadata.cc | 92 +++++++-
src/iceberg/table_update.cc | 8 +-
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/expire_snapshots_test.cc | 68 ++++++
src/iceberg/test/table_metadata_builder_test.cc | 131 ++++++++++-
src/iceberg/transaction.cc | 27 +++
src/iceberg/transaction.h | 4 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/expire_snapshots.cc | 292 ++++++++++++++++++++++++
src/iceberg/update/expire_snapshots.h | 174 ++++++++++++++
src/iceberg/update/pending_update.h | 1 +
src/iceberg/util/snapshot_util.cc | 8 +
src/iceberg/util/snapshot_util_internal.h | 9 +
19 files changed, 835 insertions(+), 10 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 23d2e4cd..86a0efd7 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
transform.cc
transform_function.cc
type.cc
+ update/expire_snapshots.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index ead2ef2c..87f508cd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -102,6 +102,7 @@ iceberg_sources = files(
'transform.cc',
'transform_function.cc',
'type.cc',
+ 'update/expire_snapshots.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_partition_spec.cc',
diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc
index dfa9a340..e4edd7b6 100644
--- a/src/iceberg/snapshot.cc
+++ b/src/iceberg/snapshot.cc
@@ -52,6 +52,19 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}
+std::optional<int64_t> SnapshotRef::max_ref_age_ms() const noexcept {
+ return std::visit(
+ [&](const auto& retention) -> std::optional<int64_t> {
+ using T = std::remove_cvref_t<decltype(retention)>;
+ if constexpr (std::is_same_v<T, Branch>) {
+ return retention.max_ref_age_ms;
+ } else {
+ return retention.max_ref_age_ms;
+ }
+ },
+ retention);
+}
+
Status SnapshotRef::Validate() const {
if (type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<Branch>(this->retention);
diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h
index e2ec0ccb..65bf2f83 100644
--- a/src/iceberg/snapshot.h
+++ b/src/iceberg/snapshot.h
@@ -113,6 +113,8 @@ struct ICEBERG_EXPORT SnapshotRef {
SnapshotRefType type() const noexcept;
+ std::optional<int64_t> max_ref_age_ms() const noexcept;
+
/// \brief Create a branch reference
///
/// \param snapshot_id The snapshot ID for the branch
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index f5aacd82..c79ac53f 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -31,6 +31,7 @@
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
+#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
@@ -184,6 +185,13 @@ Result<std::shared_ptr<UpdateSchema>>
Table::NewUpdateSchema() {
return transaction->NewUpdateSchema();
}
+Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/true));
+ return transaction->NewExpireSnapshots();
+}
+
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 7727f845..cc948248 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -147,6 +147,10 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
+ /// \brief Create a new ExpireSnapshots to remove expired snapshots and
commit the
+ /// changes.
+ virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+
protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 22eb739b..7e357bba 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -617,6 +617,9 @@ class TableMetadataBuilder::Impl {
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const
std::string& branch);
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
+ Status RemoveRef(const std::string& name);
+ Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
+ Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Result<std::unique_ptr<TableMetadata>> Build();
@@ -1334,6 +1337,84 @@ int32_t
TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
return new_schema_id;
}
+Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
+ if (name == SnapshotRef::kMainBranch) {
+ metadata_.current_snapshot_id = kInvalidSnapshotId;
+ }
+
+ if (metadata_.refs.erase(name) != 0) {
+ changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSnapshots(
+ const std::vector<int64_t>& snapshot_ids) {
+ if (snapshot_ids.empty()) {
+ return {};
+ }
+
+ std::unordered_set<int64_t> ids_to_remove(snapshot_ids.begin(),
snapshot_ids.end());
+ std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
+ retained_snapshots.reserve(metadata_.snapshots.size() - snapshot_ids.size());
+ std::vector<int64_t> snapshot_ids_to_remove;
+ snapshot_ids_to_remove.reserve(snapshot_ids.size());
+
+ for (auto& snapshot : metadata_.snapshots) {
+ ICEBERG_CHECK(snapshot != nullptr, "Encountered null snapshot in
metadata");
+ const int64_t snapshot_id = snapshot->snapshot_id;
+ if (ids_to_remove.contains(snapshot_id)) {
+ snapshots_by_id_.erase(snapshot_id);
+ snapshot_ids_to_remove.push_back(snapshot_id);
+ // FIXME: implement statistics removal and uncomment below
+ // ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
+ // ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
+ } else {
+ retained_snapshots.push_back(std::move(snapshot));
+ }
+ }
+
+ if (!snapshot_ids_to_remove.empty()) {
+
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids_to_remove));
+ }
+
+ metadata_.snapshots = std::move(retained_snapshots);
+
+ // Remove any refs that are no longer valid (dangling refs)
+ std::vector<std::string> dangling_refs;
+ for (const auto& [ref_name, ref] : metadata_.refs) {
+ if (!snapshots_by_id_.contains(ref->snapshot_id)) {
+ dangling_refs.push_back(ref_name);
+ }
+ }
+ for (const auto& ref_name : dangling_refs) {
+ ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
+ const std::vector<int32_t>& spec_ids) {
+ if (spec_ids.empty()) {
+ return {};
+ }
+
+ std::unordered_set<int32_t> spec_ids_to_remove(spec_ids.begin(),
spec_ids.end());
+ ICEBERG_PRECHECK(!spec_ids_to_remove.contains(metadata_.default_spec_id),
+ "Cannot remove the default partition spec");
+
+ metadata_.partition_specs =
+ metadata_.partition_specs | std::views::filter([&](const auto& spec) {
+ return !spec_ids_to_remove.contains(spec->spec_id());
+ }) |
+ std::ranges::to<std::vector<std::shared_ptr<PartitionSpec>>>();
+ changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
+
+ return {};
+}
+
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
: impl_(std::make_unique<Impl>(format_version)) {}
@@ -1436,7 +1517,8 @@ TableMetadataBuilder&
TableMetadataBuilder::AddPartitionSpec(
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
const std::vector<int32_t>& spec_ids) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionSpecs(spec_ids));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
@@ -1464,7 +1546,7 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
std::shared_ptr<Snapshot> snapshot) {
- ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(std::move(snapshot)));
return *this;
}
@@ -1487,7 +1569,8 @@ TableMetadataBuilder& TableMetadataBuilder::SetRef(const
std::string& name,
}
TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name)
{
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveRef(name));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
@@ -1497,7 +1580,8 @@ TableMetadataBuilder&
TableMetadataBuilder::RemoveSnapshots(
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
const std::vector<int64_t>& snapshot_ids) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSnapshots(snapshot_ids));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 29388d47..7a01bdee 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -178,7 +178,7 @@ std::unique_ptr<TableUpdate>
SetDefaultPartitionSpec::Clone() const {
// RemovePartitionSpecs
void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.RemovePartitionSpecs(spec_ids_);
}
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context)
const {
@@ -301,7 +301,9 @@ std::unique_ptr<TableUpdate> AddSnapshot::Clone() const {
// RemoveSnapshots
-void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
+void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {
+ builder.RemoveSnapshots(snapshot_ids_);
+}
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
// RemoveSnapshots doesn't generate any requirements
@@ -322,7 +324,7 @@ std::unique_ptr<TableUpdate> RemoveSnapshots::Clone() const
{
// RemoveSnapshotRef
void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.RemoveRef(ref_name_);
}
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context)
const {
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 4f4516c7..1f6ab552 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -170,6 +170,7 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(table_update_test
USE_BUNDLE
SOURCES
+ expire_snapshots_test.cc
transaction_test.cc
update_partition_spec_test.cc
update_properties_test.cc
diff --git a/src/iceberg/test/expire_snapshots_test.cc
b/src/iceberg/test/expire_snapshots_test.cc
new file mode 100644
index 00000000..dbc577a7
--- /dev/null
+++ b/src/iceberg/test/expire_snapshots_test.cc
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/expire_snapshots.h"
+
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class ExpireSnapshotsTest : public UpdateTestBase {};
+
+TEST_F(ExpireSnapshotsTest, DefaultExpireByAge) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
+ EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
+}
+
+TEST_F(ExpireSnapshotsTest, KeepAll) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+ update->RetainLast(2);
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
+ EXPECT_TRUE(result.refs_to_remove.empty());
+}
+
+TEST_F(ExpireSnapshotsTest, ExpireById) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+ update->ExpireSnapshotId(3051729675574597004);
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
+ EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
+}
+
+TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
+ struct TestCase {
+ int64_t expire_older_than;
+ size_t expected_num_expired;
+ };
+ const std::vector<TestCase> test_cases = {
+ {.expire_older_than = 1515100955770 - 1, .expected_num_expired = 0},
+ {.expire_older_than = 1515100955770 + 1, .expected_num_expired = 1}};
+ for (const auto& test_case : test_cases) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+ update->ExpireOlderThan(test_case.expire_older_than);
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.snapshot_ids_to_remove.size(),
test_case.expected_num_expired);
+ }
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/table_metadata_builder_test.cc
b/src/iceberg/test/table_metadata_builder_test.cc
index a0fbe3be..22df7430 100644
--- a/src/iceberg/test/table_metadata_builder_test.cc
+++ b/src/iceberg/test/table_metadata_builder_test.cc
@@ -61,7 +61,8 @@ Result<std::unique_ptr<Schema>> CreateDisorderedSchema() {
}
// Helper function to create base metadata for tests
-std::unique_ptr<TableMetadata> CreateBaseMetadata() {
+std::unique_ptr<TableMetadata> CreateBaseMetadata(
+ std::shared_ptr<PartitionSpec> spec = nullptr) {
auto metadata = std::make_unique<TableMetadata>();
metadata->format_version = 2;
metadata->table_uuid = "test-uuid-1234";
@@ -71,8 +72,13 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
metadata->last_column_id = 3;
metadata->current_schema_id = 0;
metadata->schemas.push_back(CreateTestSchema());
- metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
- metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+ if (spec == nullptr) {
+ metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
+ metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+ } else {
+ metadata->default_spec_id = spec->spec_id();
+ metadata->partition_specs.push_back(std::move(spec));
+ }
metadata->last_partition_id = 0;
metadata->current_snapshot_id = kInvalidSnapshotId;
metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
@@ -1127,4 +1133,123 @@ TEST(TableMetadataBuilderTest,
RemoveSchemasAfterSchemaChange) {
ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema:
1"));
}
+TEST(TableMetadataBuilderTest, RemoveSnapshotRef) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add multiple snapshots
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+ // Add multiple refs
+ ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(1));
+ ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(2));
+ builder->SetRef("ref1", std::move(ref1));
+ builder->SetRef("ref2", std::move(ref2));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->refs.size(), 2);
+
+ // Remove one ref
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveRef("ref2");
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->refs.size(), 1);
+ EXPECT_TRUE(metadata->refs.contains("ref1"));
+}
+
+TEST(TableMetadataBuilderTest, RemoveSnapshot) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add multiple snapshots
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->snapshots.size(), 2);
+
+ // Remove one snapshot
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ std::vector<int64_t> to_remove{2};
+ builder->RemoveSnapshots(to_remove);
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->snapshots.size(), 1);
+ ASSERT_THAT(metadata->SnapshotById(2), IsError(ErrorKind::kNotFound));
+}
+
+TEST(TableMetadataBuilderTest, RemoveSnapshotNotExist) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add multiple snapshots
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+ builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->snapshots.size(), 2);
+
+ // Remove one snapshot
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSnapshots(std::vector<int64_t>{3});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->snapshots.size(), 2);
+
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSnapshots(std::vector<int64_t>{1, 2});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->snapshots.size(), 0);
+}
+
+TEST(TableMetadataBuilderTest, RemovePartitionSpec) {
+ // Add multiple specs
+ PartitionField field1(2, 4, "field1", Transform::Identity());
+ PartitionField field2(3, 5, "field2", Transform::Identity());
+ ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1}));
+
+ auto base = CreateBaseMetadata(std::move(spec1));
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2}));
+ builder->AddPartitionSpec(std::move(spec2));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+ // Remove one spec
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemovePartitionSpecs({2});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->partition_specs.size(), 1);
+ ASSERT_THAT(metadata->PartitionSpecById(2), IsError(ErrorKind::kNotFound));
+}
+
+TEST(TableMetadataBuilderTest, RemovePartitionSpecNotExist) {
+ // Add multiple specs
+ PartitionField field1(2, 4, "field1", Transform::Identity());
+ PartitionField field2(3, 5, "field2", Transform::Identity());
+ ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1}));
+
+ auto base = CreateBaseMetadata(std::move(spec1));
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2}));
+ builder->AddPartitionSpec(std::move(spec2));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+ // Remove one non-existing spec
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemovePartitionSpecs({3});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+ // Remove all
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemovePartitionSpecs({2, 3});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->partition_specs.size(), 1);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 6ef942db..f763c567 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -30,6 +30,7 @@
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
+#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_partition_spec.h"
@@ -163,6 +164,25 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
+ case PendingUpdate::Kind::kExpireSnapshots: {
+ auto& expire_snapshots =
internal::checked_cast<ExpireSnapshots&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
+ if (!result.snapshot_ids_to_remove.empty()) {
+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+ }
+ if (!result.refs_to_remove.empty()) {
+ for (const auto& ref_name : result.refs_to_remove) {
+ metadata_builder_->RemoveRef(ref_name);
+ }
+ }
+ if (!result.partition_spec_ids_to_remove.empty()) {
+ metadata_builder_->RemovePartitionSpecs(
+ std::move(result.partition_spec_ids_to_remove));
+ }
+ if (!result.schema_ids_to_remove.empty()) {
+
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+ }
+ } break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -253,4 +273,11 @@ Result<std::shared_ptr<UpdateSchema>>
Transaction::NewUpdateSchema() {
return update_schema;
}
+Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ExpireSnapshots> expire_snapshots,
+ ExpireSnapshots::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots));
+ return expire_snapshots;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 3c2395c2..057a27a9 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -78,6 +78,10 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
+ /// \brief Create a new ExpireSnapshots to remove expired snapshots and
commit the
+ /// changes.
+ Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+
private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index ff49e1ed..c8854031 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -193,6 +193,7 @@ class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;
+class ExpireSnapshots;
///
----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
diff --git a/src/iceberg/update/expire_snapshots.cc
b/src/iceberg/update/expire_snapshots.cc
new file mode 100644
index 00000000..68cf08ca
--- /dev/null
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/expire_snapshots.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iterator>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create ExpireSnapshots without a transaction");
+ return std::shared_ptr<ExpireSnapshots>(new
ExpireSnapshots(std::move(transaction)));
+}
+
+ExpireSnapshots::ExpireSnapshots(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)),
+ current_time_ms_(CurrentTimePointMs()),
+
default_max_ref_age_ms_(base().properties.Get(TableProperties::kMaxRefAgeMs)),
+ default_min_num_snapshots_(
+ base().properties.Get(TableProperties::kMinSnapshotsToKeep)),
+ default_expire_older_than_(current_time_ms_ -
+
std::chrono::milliseconds(base().properties.Get(
+ TableProperties::kMaxSnapshotAgeMs))) {
+ if (!base().properties.Get(TableProperties::kGcEnabled)) {
+ AddError(
+ ValidationFailed("Cannot expire snapshots: GC is disabled (deleting
files may "
+ "corrupt other tables)"));
+ return;
+ }
+}
+
+ExpireSnapshots::~ExpireSnapshots() = default;
+
+ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) {
+ snapshot_ids_to_expire_.push_back(snapshot_id);
+ specified_snapshot_id_ = true;
+ return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) {
+ default_expire_older_than_ = TimePointMsFromUnixMs(timestamp_millis);
+ return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) {
+ ICEBERG_BUILDER_CHECK(num_snapshots > 0,
+ "Number of snapshots to retain must be positive: {}",
+ num_snapshots);
+ default_min_num_snapshots_ = num_snapshots;
+ return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::DeleteWith(
+ std::function<void(const std::string&)> delete_func) {
+ delete_func_ = std::move(delete_func);
+ return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) {
+ cleanup_level_ = level;
+ return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) {
+ clean_expired_metadata_ = clean;
+ return *this;
+}
+
+Result<std::unordered_set<int64_t>>
ExpireSnapshots::ComputeBranchSnapshotsToRetain(
+ int64_t snapshot_id, TimePointMs expire_snapshot_older_than,
+ int32_t min_snapshots_to_keep) const {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshots,
+ SnapshotUtil::AncestorsOf(snapshot_id,
[this](int64_t id) {
+ return base().SnapshotById(id);
+ }));
+
+ std::unordered_set<int64_t> ids_to_retain;
+ ids_to_retain.reserve(snapshots.size());
+
+ for (const auto& ancestor : snapshots) {
+ ICEBERG_DCHECK(ancestor != nullptr, "Ancestor snapshot is null");
+ if (ids_to_retain.size() < min_snapshots_to_keep ||
+ ancestor->timestamp_ms >= expire_snapshot_older_than) {
+ ids_to_retain.insert(ancestor->snapshot_id);
+ } else {
+ break;
+ }
+ }
+
+ return ids_to_retain;
+}
+
+Result<std::unordered_set<int64_t>>
ExpireSnapshots::ComputeAllBranchSnapshotIdsToRetain(
+ const SnapshotToRef& refs) const {
+ std::unordered_set<int64_t> snapshot_ids_to_retain;
+ for (const auto& [key, ref] : refs) {
+ if (ref->type() != SnapshotRefType::kBranch) {
+ continue;
+ }
+ const auto& branch = std::get<SnapshotRef::Branch>(ref->retention);
+ TimePointMs expire_snapshot_older_than =
+ branch.max_snapshot_age_ms.has_value()
+ ? current_time_ms_ -
+ std::chrono::milliseconds(branch.max_snapshot_age_ms.value())
+ : default_expire_older_than_;
+ int32_t min_snapshots_to_keep =
+ branch.min_snapshots_to_keep.value_or(default_min_num_snapshots_);
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto to_retain,
+ ComputeBranchSnapshotsToRetain(ref->snapshot_id,
expire_snapshot_older_than,
+ min_snapshots_to_keep));
+ snapshot_ids_to_retain.insert(std::make_move_iterator(to_retain.begin()),
+ std::make_move_iterator(to_retain.end()));
+ }
+ return snapshot_ids_to_retain;
+}
+
+Result<std::unordered_set<int64_t>>
ExpireSnapshots::UnreferencedSnapshotIdsToRetain(
+ const SnapshotToRef& refs) const {
+ std::unordered_set<int64_t> referenced_ids;
+ for (const auto& [key, ref] : refs) {
+ if (ref->type() == SnapshotRefType::kBranch) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto snapshots, SnapshotUtil::AncestorsOf(ref->snapshot_id,
[this](int64_t id) {
+ return base().SnapshotById(id);
+ }));
+ for (const auto& snapshot : snapshots) {
+ ICEBERG_DCHECK(snapshot != nullptr, "Ancestor snapshot is null");
+ referenced_ids.insert(snapshot->snapshot_id);
+ }
+ } else {
+ referenced_ids.insert(ref->snapshot_id);
+ }
+ }
+
+ std::unordered_set<int64_t> ids_to_retain;
+ for (const auto& snapshot : base().snapshots) {
+ ICEBERG_DCHECK(snapshot != nullptr, "Snapshot is null");
+ if (!referenced_ids.contains(snapshot->snapshot_id) &&
+ snapshot->timestamp_ms > default_expire_older_than_) {
+ // unreferenced and not old enough to be expired
+ ids_to_retain.insert(snapshot->snapshot_id);
+ }
+ }
+ return ids_to_retain;
+}
+
+Result<ExpireSnapshots::SnapshotToRef> ExpireSnapshots::ComputeRetainedRefs(
+ const SnapshotToRef& refs) const {
+ const TableMetadata& base = this->base();
+ SnapshotToRef retained_refs;
+
+ for (const auto& [key, ref] : refs) {
+ if (key == SnapshotRef::kMainBranch) {
+ retained_refs[key] = ref;
+ continue;
+ }
+
+ std::shared_ptr<Snapshot> snapshot;
+ if (auto result = base.SnapshotById(ref->snapshot_id); result.has_value())
{
+ snapshot = std::move(result.value());
+ } else if (result.error().kind != ErrorKind::kNotFound) {
+ ICEBERG_RETURN_UNEXPECTED(result);
+ }
+
+ auto max_ref_ags_ms =
ref->max_ref_age_ms().value_or(default_max_ref_age_ms_);
+ if (snapshot != nullptr) {
+ if (current_time_ms_ - snapshot->timestamp_ms <=
+ std::chrono::milliseconds(max_ref_ags_ms)) {
+ retained_refs[key] = ref;
+ }
+ } else {
+ // Removing invalid refs that point to non-existing snapshot
+ }
+ }
+
+ return retained_refs;
+}
+
+Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ const TableMetadata& base = this->base();
+ // Attempt to clean expired metadata even if there are no snapshots to
expire.
+ // Table metadata builder takes care of the case when this should actually
be a no-op
+ if (base.snapshots.empty() && !clean_expired_metadata_) {
+ return {};
+ }
+
+ std::unordered_set<int64_t> ids_to_retain;
+ ICEBERG_ASSIGN_OR_RAISE(auto retained_refs, ComputeRetainedRefs(base.refs));
+ std::unordered_map<int64_t, std::vector<std::string>> retained_id_to_refs;
+ for (const auto& [key, ref] : retained_refs) {
+ int64_t snapshot_id = ref->snapshot_id;
+ retained_id_to_refs.try_emplace(snapshot_id, std::vector<std::string>{});
+ retained_id_to_refs[snapshot_id].push_back(key);
+ ids_to_retain.insert(snapshot_id);
+ }
+
+ for (int64_t id : snapshot_ids_to_expire_) {
+ ICEBERG_PRECHECK(!retained_id_to_refs.contains(id),
+ "Cannot expire {}. Still referenced by refs", id);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto all_branch_snapshot_ids,
+ ComputeAllBranchSnapshotIdsToRetain(retained_refs));
+ ICEBERG_ASSIGN_OR_RAISE(auto unreferenced_snapshot_ids,
+ UnreferencedSnapshotIdsToRetain(retained_refs));
+ ids_to_retain.insert(all_branch_snapshot_ids.begin(),
all_branch_snapshot_ids.end());
+ ids_to_retain.insert(unreferenced_snapshot_ids.begin(),
+ unreferenced_snapshot_ids.end());
+
+ ApplyResult result;
+
+ std::ranges::for_each(base.refs, [&retained_refs, &result](const auto&
key_to_ref) {
+ if (!retained_refs.contains(key_to_ref.first)) {
+ result.refs_to_remove.push_back(key_to_ref.first);
+ }
+ });
+ std::ranges::for_each(base.snapshots, [&ids_to_retain, &result](const auto&
snapshot) {
+ if (snapshot && !ids_to_retain.contains(snapshot->snapshot_id)) {
+ result.snapshot_ids_to_remove.push_back(snapshot->snapshot_id);
+ }
+ });
+
+ if (clean_expired_metadata_) {
+ std::unordered_set<int32_t> reachable_specs = {base.default_spec_id};
+ std::unordered_set<int32_t> reachable_schemas = {base.current_schema_id};
+
+ // TODO(xiao.dong) parallel processing
+ for (int64_t snapshot_id : ids_to_retain) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id));
+ SnapshotCache snapshot_cache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests,
+
snapshot_cache.Manifests(transaction_->table()->io()));
+ for (const auto& manifest : manifests) {
+ reachable_specs.insert(manifest.partition_spec_id);
+ }
+ if (snapshot->schema_id.has_value()) {
+ reachable_schemas.insert(snapshot->schema_id.value());
+ }
+ }
+
+ std::ranges::for_each(
+ base.partition_specs, [&reachable_specs, &result](const auto& spec) {
+ if (!reachable_specs.contains(spec->spec_id())) {
+ result.partition_spec_ids_to_remove.emplace_back(spec->spec_id());
+ }
+ });
+ std::ranges::for_each(base.schemas,
+ [&reachable_schemas, &result](const auto& schema) {
+ if
(!reachable_schemas.contains(schema->schema_id())) {
+
result.schema_ids_to_remove.insert(schema->schema_id());
+ }
+ });
+ }
+
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/expire_snapshots.h
b/src/iceberg/update/expire_snapshots.h
new file mode 100644
index 00000000..17a4d8b0
--- /dev/null
+++ b/src/iceberg/update/expire_snapshots.h
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+#include "iceberg/util/timepoint.h"
+
+/// \file iceberg/update/expire_snapshots.h
+/// \brief API for removing old snapshots from a table.
+
+namespace iceberg {
+
+/// \brief An enum representing possible clean up levels used in snapshot
expiration.
+enum class CleanupLevel : uint8_t {
+ /// Skip all file cleanup, only remove snapshot metadata.
+ kNone,
+ /// Clean up only metadata files (manifests, manifest lists, statistics),
retain data
+ /// files.
+ kMetadataOnly,
+ /// Clean up both metadata and data files (default).
+ kAll
+};
+
+/// \brief API for removing old snapshots from a table.
+///
+/// This API accumulates snapshot deletions and commits the new list to the
table. This
+/// API does not allow deleting the current snapshot.
+///
+/// When committing, these changes will be applied to the latest table
metadata. Commit
+/// conflicts will be resolved by applying the changes to the new latest
metadata and
+/// reattempting the commit.
+///
+/// Manifest files that are no longer used by valid snapshots will be deleted.
Data files
+/// that were deleted by snapshots that are expired will be deleted.
DeleteWith() can be
+/// used to pass an alternative deletion method.
+///
+/// Apply() returns a list of the snapshots that will be removed.
+class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
+ public:
+ static Result<std::shared_ptr<ExpireSnapshots>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~ExpireSnapshots() override;
+
+ struct ApplyResult {
+ std::vector<std::string> refs_to_remove;
+ std::vector<int64_t> snapshot_ids_to_remove;
+ std::vector<int32_t> partition_spec_ids_to_remove;
+ std::unordered_set<int32_t> schema_ids_to_remove;
+ };
+
+ /// \brief Expires a specific Snapshot identified by id.
+ ///
+ /// \param snapshot_id Long id of the snapshot to expire.
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id);
+
+ /// \brief Expires all snapshots older than the given timestamp.
+ ///
+ /// \param timestamp_millis A long timestamp in milliseconds.
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis);
+
+ /// \brief Retains the most recent ancestors of the current snapshot.
+ ///
+ /// If a snapshot would be expired because it is older than the expiration
timestamp,
+ /// but is one of the num_snapshots most recent ancestors of the current
state, it will
+ /// be retained. This will not cause snapshots explicitly identified by id
from
+ /// expiring.
+ ///
+ /// This may keep more than num_snapshots ancestors if snapshots are added
concurrently.
+ /// This may keep less than num_snapshots ancestors if the current table
state does not
+ /// have that many.
+ ///
+ /// \param num_snapshots The number of snapshots to retain.
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& RetainLast(int num_snapshots);
+
+ /// \brief Passes an alternative delete implementation that will be used for
manifests
+ /// and data files.
+ ///
+ /// Manifest files that are no longer used by valid snapshots will be
deleted. Data
+ /// files that were deleted by snapshots that are expired will be deleted.
+ ///
+ /// If this method is not called, unnecessary manifests and data files will
still be
+ /// deleted.
+ ///
+ /// \param delete_func A function that will be called to delete manifests
and data files
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& DeleteWith(std::function<void(const std::string&)>
delete_func);
+
+ /// \brief Configures the cleanup level for expired files.
+ ///
+ /// This method provides fine-grained control over which files are cleaned
up during
+ /// snapshot expiration.
+ ///
+ /// Consider CleanupLevel::kMetadataOnly when data files are shared across
tables or
+ /// when using procedures like add-files that may reference the same data
files.
+ ///
+ /// Consider CleanupLevel::kNone when data and metadata files may be more
efficiently
+ /// removed using a distributed framework through the actions API.
+ ///
+ /// \param level The cleanup level to use for expired snapshots.
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& CleanupLevel(enum CleanupLevel level);
+
+ /// \brief Enable cleaning up unused metadata, such as partition specs,
schemas, etc.
+ ///
+ /// \param clean Remove unused partition specs, schemas, or other metadata
when true.
+ /// \return Reference to this for method chaining.
+ ExpireSnapshots& CleanExpiredMetadata(bool clean);
+
+ Kind kind() const final { return Kind::kExpireSnapshots; }
+
+ /// \brief Apply the pending changes and return the results
+ /// \return The results of changes
+ Result<ApplyResult> Apply();
+
+ private:
+ explicit ExpireSnapshots(std::shared_ptr<Transaction> transaction);
+
+ using SnapshotToRef = std::unordered_map<std::string,
std::shared_ptr<SnapshotRef>>;
+
+ Result<SnapshotToRef> ComputeRetainedRefs(const SnapshotToRef& refs) const;
+
+ Result<std::unordered_set<int64_t>> ComputeBranchSnapshotsToRetain(
+ int64_t snapshot_id, TimePointMs expire_snapshot_older_than,
+ int32_t min_snapshots_to_keep) const;
+
+ Result<std::unordered_set<int64_t>> ComputeAllBranchSnapshotIdsToRetain(
+ const SnapshotToRef& refs) const;
+
+ Result<std::unordered_set<int64_t>> UnreferencedSnapshotIdsToRetain(
+ const SnapshotToRef& refs) const;
+
+ private:
+ const TimePointMs current_time_ms_;
+ const int64_t default_max_ref_age_ms_;
+ int32_t default_min_num_snapshots_;
+ TimePointMs default_expire_older_than_;
+ std::function<void(const std::string&)> delete_func_;
+ std::vector<int64_t> snapshot_ids_to_expire_;
+ enum CleanupLevel cleanup_level_ { CleanupLevel::kAll };
+ bool clean_expired_metadata_{false};
+ bool specified_snapshot_id_{false};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index 2124d7e1..8a8329ee 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -42,6 +42,7 @@ namespace iceberg {
class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
public:
enum class Kind : uint8_t {
+ kExpireSnapshots,
kUpdatePartitionSpec,
kUpdateProperties,
kUpdateSchema,
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
index 4395dc27..c3b93be8 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -46,6 +46,14 @@ Result<std::vector<std::shared_ptr<Snapshot>>>
SnapshotUtil::AncestorsOf(
});
}
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+ int64_t snapshot_id,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
+ return lookup(snapshot_id).and_then([&lookup](const auto& snapshot) {
+ return AncestorsOf(snapshot, lookup);
+ });
+}
+
Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t
snapshot_id,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
index befa96fe..ca106cb3 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -44,6 +44,15 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const
Table& table,
int64_t
snapshot_id);
+ /// \brief Returns a vector of ancestors of the given snapshot.
+ ///
+ /// \param snapshot_id The snapshot ID to start from
+ /// \param lookup A function to look up snapshots by ID
+ /// \return A vector of ancestor snapshots
+ static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+ int64_t snapshot_id,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);
+
/// \brief Returns whether ancestor_snapshot_id is an ancestor of
snapshot_id.
///
/// \param table The table to check