This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 7e6a7e3a feat: add SnapshotManager (#542)
7e6a7e3a is described below
commit 7e6a7e3a1ad4cb26e16b5883b36891f006ef1c80
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Feb 24 16:49:22 2026 +0800
feat: add SnapshotManager (#542)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/table.cc | 5 +
src/iceberg/table.h | 3 +
src/iceberg/test/CMakeLists.txt | 2 +-
src/iceberg/test/fast_append_test.cc | 8 +-
src/iceberg/test/set_snapshot_test.cc | 155 -----------
src/iceberg/test/snapshot_manager_test.cc | 437 ++++++++++++++++++++++++++++++
src/iceberg/test/update_test_base.h | 80 +++++-
src/iceberg/transaction.cc | 26 +-
src/iceberg/transaction.h | 11 +-
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/snapshot_manager.cc | 209 ++++++++++++++
src/iceberg/update/snapshot_manager.h | 203 ++++++++++++++
src/iceberg/update/snapshot_update.cc | 14 -
src/iceberg/update/snapshot_update.h | 23 +-
17 files changed, 982 insertions(+), 198 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ce995cd4..21e87bee 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -90,6 +90,7 @@ set(ICEBERG_SOURCES
update/fast_append.cc
update/pending_update.cc
update/set_snapshot.cc
+ update/snapshot_manager.cc
update/snapshot_update.cc
update/update_location.cc
update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 6c8adcf7..bfc502fd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -108,6 +108,7 @@ iceberg_sources = files(
'update/fast_append.cc',
'update/pending_update.cc',
'update/set_snapshot.cc',
+ 'update/snapshot_manager.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index b6c26ea0..0550f61d 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -32,6 +32,7 @@
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
+#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
@@ -222,6 +223,10 @@ Result<std::shared_ptr<UpdatePartitionStatistics>>
Table::NewUpdatePartitionStat
return transaction->NewUpdatePartitionStatistics();
}
+Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
+ return SnapshotManager::Make(shared_from_this());
+}
+
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 1f3135dd..423911c2 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// \brief Create a new FastAppend to append data files and commit the
changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();
+ /// \brief Create a new SnapshotManager to manage snapshots and snapshot
references.
+ virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
+
protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 28e7cb19..fdd88888 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -179,7 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
- set_snapshot_test.cc
+ snapshot_manager_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
diff --git a/src/iceberg/test/fast_append_test.cc
b/src/iceberg/test/fast_append_test.cc
index 7c79d5e9..6c77fad1 100644
--- a/src/iceberg/test/fast_append_test.cc
+++ b/src/iceberg/test/fast_append_test.cc
@@ -39,10 +39,12 @@ class FastAppendTest : public UpdateTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }
+ std::string MetadataResource() const override {
+ return "TableMetadataV2ValidMinimal.json";
+ }
+
void SetUp() override {
- InitializeFileIO();
- // Use minimal metadata for FastAppend tests
- RegisterTableFromResource("TableMetadataV2ValidMinimal.json");
+ UpdateTestBase::SetUp();
// Get partition spec and schema from the base table
ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
diff --git a/src/iceberg/test/set_snapshot_test.cc
b/src/iceberg/test/set_snapshot_test.cc
deleted file mode 100644
index 6bbd59b8..00000000
--- a/src/iceberg/test/set_snapshot_test.cc
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "iceberg/update/set_snapshot.h"
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include "iceberg/result.h"
-#include "iceberg/snapshot.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/test/update_test_base.h"
-#include "iceberg/transaction.h"
-
-namespace iceberg {
-
-class SetSnapshotTest : public UpdateTestBase {
- protected:
- // Snapshot IDs from TableMetadataV2Valid.json
- static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
- static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
-
- // Timestamps from TableMetadataV2Valid.json
- static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
- static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
-};
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
-
- set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
-
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-
- // Commit and verify the change was persisted
- EXPECT_THAT(set_snapshot->Commit(), IsOk());
- EXPECT_THAT(transaction->Commit(), IsOk());
- ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
- ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
- EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
-
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Try to set to a non-existent snapshot
- int64_t invalid_snapshot_id = 9999999999999999;
- set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
-
- auto result = set_snapshot->Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
- EXPECT_THAT(result, HasErrorMessage("is not found"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToValid) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Rollback to the oldest snapshot (which is an ancestor)
- set_snapshot->RollbackTo(kOldestSnapshotId);
-
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Try to rollback to a non-existent snapshot
- int64_t invalid_snapshot_id = 9999999999999999;
- set_snapshot->RollbackTo(invalid_snapshot_id);
-
- auto result = set_snapshot->Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
- EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Rollback to a time between the two snapshots
- // This should select the oldest snapshot
- int64_t time_between = (kOldestSnapshotTimestamp +
kCurrentSnapshotTimestamp) / 2;
- set_snapshot->RollbackToTime(time_between);
-
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Try to rollback to a time before any snapshot
- int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
- set_snapshot->RollbackToTime(time_before_all);
-
- // Should fail - no snapshot older than the specified time
- auto result = set_snapshot->Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
- EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- // Rollback to a timestamp just after the oldest snapshot
- int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
- set_snapshot->RollbackToTime(time_just_after_oldest);
-
- // Apply and verify - should return the oldest snapshot
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
- ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
- ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
- ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
- EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
-
- EXPECT_THAT(set_snapshot->Commit(), IsOk());
- EXPECT_THAT(transaction->Commit(), IsOk());
- ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
- ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
- EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
-}
-
-} // namespace iceberg
diff --git a/src/iceberg/test/snapshot_manager_test.cc
b/src/iceberg/test/snapshot_manager_test.cc
new file mode 100644
index 00000000..bc00db39
--- /dev/null
+++ b/src/iceberg/test/snapshot_manager_test.cc
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_manager.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/snapshot.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+
+namespace iceberg {
+
+class SnapshotManagerTest : public UpdateTestBase {
+ protected:
+ void SetUp() override {
+ UpdateTestBase::SetUp();
+ ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot());
+ current_snapshot_id_ = current->snapshot_id;
+ ASSERT_FALSE(table_->snapshots().empty());
+ oldest_snapshot_id_ = table_->snapshots().front()->snapshot_id;
+ }
+
+ int64_t current_snapshot_id_{};
+ int64_t oldest_snapshot_id_{};
+};
+
+TEST_F(SnapshotManagerTest, CreateBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1");
+ ExpectCommitOk(manager->Commit());
+ ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, table_->NewSnapshotManager());
+ new_manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kCommitFailed,
+ "branch 'branch1' was created concurrently");
+}
+
+TEST_F(SnapshotManagerTest, CreateTag) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, table_->NewSnapshotManager());
+ new_manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kCommitFailed,
+ "tag 'tag1' was created concurrently");
+}
+
+TEST_F(SnapshotManagerTest, RemoveBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->RemoveBranch("branch1");
+ ExpectCommitOk(new_manager->Commit());
+ ExpectNoRef("branch1");
+}
+
+TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RemoveBranch("non-existing");
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "Branch does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, RemovingMainBranchFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RemoveBranch(std::string(SnapshotRef::kMainBranch));
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "Cannot remove main branch");
+}
+
+TEST_F(SnapshotManagerTest, RemoveTag) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->RemoveTag("tag1");
+ ExpectCommitOk(new_manager->Commit());
+ ExpectNoRef("tag1");
+}
+
+TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RemoveTag("non-existing");
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "Tag does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", oldest_snapshot_id_);
+ manager->CreateBranch("branch2", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->ReplaceBranch("branch1", "branch2");
+ ExpectCommitOk(new_manager->Commit());
+ ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->ReplaceBranch("branch1", "non-existing");
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+ "Ref does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest,
ReplaceBranchNonExistingFromBranchCreatesTheBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->ReplaceBranch("new-branch", "branch1");
+ ExpectCommitOk(new_manager->Commit());
+ ExpectBranch("new-branch", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest,
FastForwardBranchNonExistingFromBranchCreatesTheBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->FastForwardBranch("new-branch", "branch1");
+ ExpectCommitOk(new_manager->Commit());
+ ExpectBranch("new-branch", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->FastForwardBranch("branch1", "non-existing");
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+ "Ref does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceTag) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->ReplaceTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(new_manager->Commit());
+ ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, UpdatingBranchRetention) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->SetMinSnapshotsToKeep("branch1", 10);
+ new_manager->SetMaxSnapshotAgeMs("branch1", 20000);
+ ExpectCommitOk(new_manager->Commit());
+ }
+
+ auto metadata = ReloadMetadata();
+ auto ref = metadata->refs.at("branch1");
+ EXPECT_EQ(ref->type(), SnapshotRefType::kBranch);
+ const auto& branch = std::get<SnapshotRef::Branch>(ref->retention);
+ EXPECT_EQ(branch.max_snapshot_age_ms, 20000);
+ EXPECT_EQ(branch.min_snapshots_to_keep, 10);
+}
+
+TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->SetMinSnapshotsToKeep("tag1", 10);
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+ "Ref 'tag1' is a tag not a branch");
+ }
+
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->SetMaxSnapshotAgeMs("tag1", 10);
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+ "Ref 'tag1' is a tag not a branch");
+ }
+}
+
+TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->SetMaxRefAgeMs("branch1", 10000);
+ ExpectCommitOk(new_manager->Commit());
+
+ EXPECT_EQ(ReloadMetadata()->refs.at("branch1")->max_ref_age_ms(), 10000);
+}
+
+TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->SetMaxRefAgeMs("tag1", 10000);
+ ExpectCommitOk(new_manager->Commit());
+
+ EXPECT_EQ(ReloadMetadata()->refs.at("tag1")->max_ref_age_ms(), 10000);
+}
+
+TEST_F(SnapshotManagerTest, RenameBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->RenameBranch("branch1", "branch2");
+ ExpectCommitOk(new_manager->Commit());
+
+ ExpectNoRef("branch1");
+ ExpectBranch("branch2", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, FailRenamingMainBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch");
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "Cannot rename main branch");
+}
+
+TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RenameBranch("some-missing-branch", "some-branch");
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "Branch does not exist: some-missing-branch");
+}
+
+TEST_F(SnapshotManagerTest, RollbackToTime) {
+ // The oldest snapshot has timestamp 1515100955770, the current has
1555100955770.
+ // Rolling back to a time between them should land on the oldest snapshot.
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RollbackToTime(1535100955770);
+ ExpectCommitOk(manager->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RollbackToTimeBeforeAllSnapshotsFails) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RollbackToTime(1000);
+ ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+ "no valid snapshot older than");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranchBySnapshotId) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", oldest_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->ReplaceBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(new_manager->Commit());
+ ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RefUpdatesFlushedBeforeSnapshotOperation) {
+ // Interleave a ref operation (CreateBranch) with a snapshot operation
+ // (SetCurrentSnapshot). The ref updates should be flushed before the
snapshot
+ // operation is applied, so both should take effect.
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ manager->SetCurrentSnapshot(oldest_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectBranch("branch1", current_snapshot_id_);
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SnapshotOperationThenRefUpdates) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->SetCurrentSnapshot(oldest_snapshot_id_);
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+ ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RollbackTo) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->RollbackTo(oldest_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SetCurrentSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->SetCurrentSnapshot(oldest_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1", current_snapshot_id_);
+ manager->CreateTag("tag1", current_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+ new_manager->RollbackTo(oldest_snapshot_id_);
+ ExpectCommitOk(new_manager->Commit());
+
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+ ExpectBranch("branch1", current_snapshot_id_);
+ ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn));
+ manager->RollbackTo(oldest_snapshot_id_);
+ ExpectCommitOk(txn->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest,
SnapshotManagerFromTableAllowsMultipleSnapshotOperations) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->SetCurrentSnapshot(oldest_snapshot_id_);
+ manager->SetCurrentSnapshot(current_snapshot_id_);
+ manager->RollbackTo(oldest_snapshot_id_);
+ ExpectCommitOk(manager->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest,
+ SnapshotManagerFromTransactionAllowsMultipleSnapshotOperations) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn));
+ manager->SetCurrentSnapshot(oldest_snapshot_id_);
+ manager->SetCurrentSnapshot(current_snapshot_id_);
+ manager->RollbackTo(oldest_snapshot_id_);
+ ExpectCommitOk(txn->Commit());
+ ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+class SnapshotManagerMinimalTableTest : public MinimalUpdateTestBase {};
+
+TEST_F(SnapshotManagerMinimalTableTest, CreateBranchOnEmptyTable) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1");
+ ExpectCommitOk(manager->Commit());
+
+ auto metadata = ReloadMetadata();
+ EXPECT_FALSE(metadata->refs.contains(std::string(SnapshotRef::kMainBranch)));
+ auto it = metadata->refs.find("branch1");
+ ASSERT_NE(it, metadata->refs.end());
+ EXPECT_EQ(it->second->type(), SnapshotRefType::kBranch);
+}
+
+TEST_F(SnapshotManagerMinimalTableTest,
+ CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) {
+ ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+ manager->CreateBranch("branch1");
+ ExpectCommitOk(manager->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch,
catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto new_manager,
table_with_branch->NewSnapshotManager());
+ new_manager->CreateBranch("branch1");
+ ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+ "Ref branch1 already exists");
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/update_test_base.h
b/src/iceberg/test/update_test_base.h
index c14cb76b..310feb37 100644
--- a/src/iceberg/test/update_test_base.h
+++ b/src/iceberg/test/update_test_base.h
@@ -28,6 +28,8 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
@@ -37,12 +39,18 @@
namespace iceberg {
-// Base test fixture for table update operations
+/// \brief Base test fixture for table update operations.
class UpdateTestBase : public ::testing::Test {
protected:
+ virtual std::string MetadataResource() const { return
"TableMetadataV2Valid.json"; }
+ virtual std::string TableName() const { return "test_table"; }
+
void SetUp() override {
+ table_ident_ = TableIdentifier{.name = TableName()};
+ table_location_ = "/warehouse/" + TableName();
+
InitializeFileIO();
- RegisterTableFromResource("TableMetadataV2Valid.json");
+ RegisterTableFromResource(MetadataResource());
}
/// \brief Initialize file IO and create necessary directories.
@@ -62,10 +70,8 @@ class UpdateTestBase : public ::testing::Test {
///
/// \param resource_name The name of the metadata resource file
void RegisterTableFromResource(const std::string& resource_name) {
- // Drop existing table if it exists
std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
- // Write table metadata to the table location.
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
table_location_,
Uuid::GenerateV7().ToString());
ICEBERG_UNWRAP_OR_FAIL(auto metadata,
ReadTableMetadataFromResource(resource_name));
@@ -73,16 +79,76 @@ class UpdateTestBase : public ::testing::Test {
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location,
*metadata),
IsOk());
- // Register the table in the catalog.
ICEBERG_UNWRAP_OR_FAIL(table_,
catalog_->RegisterTable(table_ident_,
metadata_location));
}
- const TableIdentifier table_ident_{.name = "test_table"};
- const std::string table_location_{"/warehouse/test_table"};
+ /// \brief Reload the table from catalog and return its metadata.
+ std::shared_ptr<TableMetadata> ReloadMetadata() {
+ auto result = catalog_->LoadTable(table_ident_);
+ EXPECT_TRUE(result.has_value()) << "Failed to reload table";
+ return result.value()->metadata();
+ }
+
+ /// \brief Assert that a ref exists with the given type and snapshot id.
+ void ExpectRef(const std::string& name, SnapshotRefType type, int64_t
snapshot_id) {
+ auto metadata = ReloadMetadata();
+ auto it = metadata->refs.find(name);
+ ASSERT_NE(it, metadata->refs.end()) << "Ref not found: " << name;
+ EXPECT_EQ(it->second->type(), type);
+ EXPECT_EQ(it->second->snapshot_id, snapshot_id);
+ }
+
+ void ExpectBranch(const std::string& name, int64_t snapshot_id) {
+ ExpectRef(name, SnapshotRefType::kBranch, snapshot_id);
+ }
+
+ void ExpectTag(const std::string& name, int64_t snapshot_id) {
+ ExpectRef(name, SnapshotRefType::kTag, snapshot_id);
+ }
+
+ /// \brief Assert that a ref does not exist.
+ void ExpectNoRef(const std::string& name) {
+ auto metadata = ReloadMetadata();
+ EXPECT_FALSE(metadata->refs.contains(name)) << "Ref should not exist: " <<
name;
+ }
+
+ /// \brief Assert the current snapshot id after reloading.
+ void ExpectCurrentSnapshot(int64_t snapshot_id) {
+ auto result = catalog_->LoadTable(table_ident_);
+ ASSERT_TRUE(result.has_value());
+ auto snap_result = result.value()->current_snapshot();
+ ASSERT_TRUE(snap_result.has_value());
+ EXPECT_EQ(snap_result.value()->snapshot_id, snapshot_id);
+ }
+
+ /// \brief Assert that a commit succeeded.
+ template <typename T>
+ void ExpectCommitOk(const T& result) {
+ EXPECT_THAT(result, IsOk());
+ }
+
+ /// \brief Assert that a commit failed with the given error kind and message
substring.
+ template <typename T>
+ void ExpectCommitError(const T& result, ErrorKind kind, const std::string&
message) {
+ EXPECT_THAT(result, IsError(kind));
+ EXPECT_THAT(result, HasErrorMessage(message));
+ }
+
+ TableIdentifier table_ident_;
+ std::string table_location_;
std::shared_ptr<FileIO> file_io_;
std::shared_ptr<InMemoryCatalog> catalog_;
std::shared_ptr<Table> table_;
};
+/// \brief Test fixture for table update operations on minimal table metadata.
+class MinimalUpdateTestBase : public UpdateTestBase {
+ protected:
+ std::string MetadataResource() const override {
+ return "TableMetadataV2ValidMinimal.json";
+ }
+ std::string TableName() const override { return "minimal_table"; }
+};
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index b24aa0da..6df45b30 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -36,6 +36,7 @@
#include "iceberg/update/fast_append.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/set_snapshot.h"
+#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
#include "iceberg/update/update_partition_spec.h"
@@ -62,9 +63,7 @@ Transaction::~Transaction() = default;
Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table>
table,
Kind kind, bool
auto_commit) {
- if (!table || !table->catalog()) [[unlikely]] {
- return InvalidArgument("Table and catalog cannot be null");
- }
+ ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be
null");
std::unique_ptr<TableMetadataBuilder> metadata_builder;
if (kind == Kind::kCreate) {
@@ -93,9 +92,9 @@ std::string
Transaction::MetadataFileLocation(std::string_view filename) const {
}
Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
- if (!last_update_committed_) {
- return InvalidArgument("Cannot add update when previous update is not
committed");
- }
+ ICEBERG_CHECK(last_update_committed_,
+ "Cannot add update when previous update is not committed");
+
pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
last_update_committed_ = false;
return {};
@@ -301,13 +300,9 @@ Status
Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& up
}
Result<std::shared_ptr<Table>> Transaction::Commit() {
- if (committed_) {
- return Invalid("Transaction already committed");
- }
- if (!last_update_committed_) {
- return InvalidArgument(
- "Cannot commit transaction when previous update is not committed");
- }
+ ICEBERG_CHECK(!committed_, "Transaction already committed");
+ ICEBERG_CHECK(last_update_committed_,
+ "Cannot commit transaction when previous update is not
committed");
const auto& updates = metadata_builder_->changes();
if (updates.empty()) {
@@ -428,4 +423,9 @@ Transaction::NewUpdateSnapshotReference() {
return update_ref;
}
+Result<std::shared_ptr<SnapshotManager>> Transaction::NewSnapshotManager() {
+ // SnapshotManager has its own commit logic, so it is not added to the
pending updates.
+ return SnapshotManager::Make(shared_from_this());
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index e975be7f..438054b5 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -94,13 +94,16 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
+ /// \brief Create a new FastAppend to append data files and commit the
changes.
+ Result<std::shared_ptr<FastAppend>> NewFastAppend();
+
+ /// \brief Create a new SnapshotManager to manage snapshots.
+ Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
+
/// \brief Create a new SetSnapshot to set the current snapshot or rollback
to a
/// previous snapshot and commit the changes.
Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
- /// \brief Create a new FastAppend to append data files and commit the
changes.
- Result<std::shared_ptr<FastAppend>> NewFastAppend();
-
/// \brief Create a new UpdateSnapshotReference to update snapshot
references (branches
/// and tags) and commit the changes.
Result<std::shared_ptr<UpdateSnapshotReference>>
NewUpdateSnapshotReference();
@@ -136,7 +139,7 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
const Kind kind_;
// Whether to auto-commit the transaction when updates are applied.
// This is useful when a temporary transaction is created for a single
operation.
- const bool auto_commit_;
+ bool auto_commit_;
// To make the state simple, we require updates are added and committed in
order.
bool last_update_committed_ = true;
// Tracks if transaction has been committed to prevent double-commit
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index e97de0ac..7a3f50df 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -193,6 +193,7 @@ class ExpireSnapshots;
class FastAppend;
class PendingUpdate;
class SetSnapshot;
+class SnapshotManager;
class SnapshotUpdate;
class UpdateLocation;
class UpdatePartitionSpec;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 102471c0..6acb007a 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -21,6 +21,7 @@ install_headers(
'fast_append.h',
'pending_update.h',
'set_snapshot.h',
+ 'snapshot_manager.h',
'snapshot_update.h',
'update_location.h',
'update_partition_spec.h',
diff --git a/src/iceberg/update/snapshot_manager.cc
b/src/iceberg/update/snapshot_manager.cc
new file mode 100644
index 00000000..d882dd32
--- /dev/null
+++ b/src/iceberg/update/snapshot_manager.cc
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_manager.h"
+
+#include "iceberg/result.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/set_snapshot.h"
+#include "iceberg/update/update_snapshot_reference.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
+ std::shared_ptr<Table> table) {
+ ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null");
+ ICEBERG_ASSIGN_OR_RAISE(auto transaction,
+ Transaction::Make(std::move(table),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/false));
+ return std::shared_ptr<SnapshotManager>(
+ new SnapshotManager(std::move(transaction),
/*is_external_transaction=*/false));
+}
+
+Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null");
+ return std::shared_ptr<SnapshotManager>(
+ new SnapshotManager(std::move(transaction),
/*is_external_transaction=*/true));
+}
+
+SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction,
+ bool is_external_transaction)
+ : transaction_(std::move(transaction)),
+ is_external_transaction_(is_external_transaction) {}
+
+SnapshotManager::~SnapshotManager() = default;
+
+SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) {
+ ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+ // TODO(anyone): Implement cherrypick operation
+ ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented");
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) {
+ ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot,
transaction_->NewSetSnapshot());
+ set_snapshot->SetCurrentSnapshot(snapshot_id);
+ ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::RollbackToTime(int64_t timestamp_ms) {
+ ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot,
transaction_->NewSetSnapshot());
+ set_snapshot->RollbackToTime(timestamp_ms);
+ ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
+ ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot,
transaction_->NewSetSnapshot());
+ set_snapshot->RollbackTo(snapshot_id);
+ ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) {
+ const auto& base = transaction_->current();
+ if (base.current_snapshot_id != kInvalidSnapshotId) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base.Snapshot());
+ ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not
be null");
+ return CreateBranch(name, current_snapshot->snapshot_id);
+ }
+ ICEBERG_BUILDER_CHECK(!base.refs.contains(name), "Ref {} already exists",
name);
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append,
transaction_->NewFastAppend());
+ ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->SetTargetBranch(name).Commit());
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateBranch(const std::string& name,
+ int64_t snapshot_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->CreateBranch(name, snapshot_id);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateTag(const std::string& name,
+ int64_t snapshot_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->CreateTag(name, snapshot_id);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->RemoveBranch(name);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->RemoveTag(name);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name,
+ int64_t snapshot_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->ReplaceTag(name, snapshot_id);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name,
+ int64_t snapshot_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->ReplaceBranch(name, snapshot_id);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from,
+ const std::string& to) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->ReplaceBranch(from, to);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from,
+ const std::string& to) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->FastForward(from, to);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::RenameBranch(const std::string& name,
+ const std::string& new_name) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->RenameBranch(name, new_name);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string&
branch_name,
+ int32_t
min_snapshots_to_keep) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string&
branch_name,
+ int64_t
max_snapshot_age_ms) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms);
+ return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
+ int64_t max_ref_age_ms) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
UpdateSnapshotReferencesOperation());
+ update_ref->SetMaxRefAgeMs(name, max_ref_age_ms);
+ return *this;
+}
+
+Status SnapshotManager::Commit() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
+ if (!is_external_transaction_) {
+ ICEBERG_RETURN_UNEXPECTED(transaction_->Commit());
+ }
+ return {};
+}
+
+Result<std::shared_ptr<UpdateSnapshotReference>>
+SnapshotManager::UpdateSnapshotReferencesOperation() {
+ if (update_snap_refs_ == nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(update_snap_refs_,
+ transaction_->NewUpdateSnapshotReference());
+ }
+ return update_snap_refs_;
+}
+
+Status SnapshotManager::CommitIfRefUpdatesExist() {
+ if (update_snap_refs_ != nullptr) {
+ ICEBERG_RETURN_UNEXPECTED(update_snap_refs_->Commit());
+ update_snap_refs_ = nullptr;
+ }
+ return {};
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/snapshot_manager.h
b/src/iceberg/update/snapshot_manager.h
new file mode 100644
index 00000000..fd81f833
--- /dev/null
+++ b/src/iceberg/update/snapshot_manager.h
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+
+/// \file iceberg/update/snapshot_manager.h
+/// \brief API for managing snapshots and snapshot references.
+
+namespace iceberg {
+
+/// \brief API for managing snapshots.
+///
+/// Allows rolling table data back to a stated at an older table snapshot.
+///
+/// Rollback: This API does not allow conflicting calls to SetCurrentSnapshot
and
+/// RollbackToTime. When committing, these changes will be applied to the
current table
+/// metadata. Commit conflicts will not be resolved and will result in a
+///
+/// Cherrypick: In an audit workflow, new data is written to an orphan
snapshot that is
+/// not committed as the table's current state until it is audited. After
auditing a
+/// change, it may need to be applied or cherry-picked on top of the latest
snapshot
+/// instead of the one that was current when the audited changes were created.
This class
+/// adds support for cherry-picking the changes from an orphan snapshot by
applying them
+/// to the current snapshot. The output of the operation is a new snapshot
with the
+/// changes from cherry-picked snapshot.
+class ICEBERG_EXPORT SnapshotManager : public ErrorCollector {
+ public:
+ /// \brief Create a SnapshotManager that owns its own transaction.
+ static Result<std::shared_ptr<SnapshotManager>> Make(std::shared_ptr<Table>
table);
+
+ /// \brief Create a SnapshotManager from an existing transaction.
+ ///
+ /// \note The caller is responsible for committing the transaction.
+ static Result<std::shared_ptr<SnapshotManager>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~SnapshotManager() override;
+
+ /// \brief Apply supported changes in given snapshot and create a new
snapshot which
+ /// will be set as the current snapshot on commit.
+ ///
+ /// \param snapshot_id a Snapshot ID whose changes to apply
+ /// \return Reference to this for method chaining
+ SnapshotManager& Cherrypick(int64_t snapshot_id);
+
+ /// \brief Roll this table's data back to a specific snapshot identified by
id.
+ ///
+ /// \param snapshot_id long id of the snapshot to roll back table data to
+ /// \return Reference to this for method chaining
+ SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id);
+
+ /// \brief Roll this table's data back to the last snapshot before the given
timestamp.
+ ///
+ /// \param timestamp_ms a long timestamp in milliseconds
+ /// \return Reference to this for method chaining
+ SnapshotManager& RollbackToTime(int64_t timestamp_ms);
+
+ /// \brief Rollback table's state to a specific snapshot identified by id.
+ ///
+ /// \param snapshot_id long id of snapshot to roll back table to. Must be an
ancestor
+ /// of the current snapshot
+ /// \return Reference to this for method chaining
+ SnapshotManager& RollbackTo(int64_t snapshot_id);
+
+ /// \brief Create a new branch. The branch will point to current snapshot if
the
+ /// current snapshot is not NULL. Otherwise, the branch will point to a
newly created
+ /// empty snapshot.
+ ///
+ /// \param name branch name
+ /// \return Reference to this for method chaining
+ SnapshotManager& CreateBranch(const std::string& name);
+
+ /// \brief Create a new branch pointing to the given snapshot id.
+ ///
+ /// \param name branch name
+ /// \param snapshot_id id of the snapshot which will be the head of the
branch
+ /// \return Reference to this for method chaining
+ SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id);
+
+ /// \brief Create a new tag pointing to the given snapshot id.
+ ///
+ /// \param name tag name
+ /// \param snapshot_id snapshot id for the head of the new tag
+ /// \return Reference to this for method chaining
+ SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id);
+
+ /// \brief Remove a branch by name.
+ ///
+ /// \param name branch name
+ /// \return Reference to this for method chaining
+ SnapshotManager& RemoveBranch(const std::string& name);
+
+ /// \brief Remove the tag with the given name.
+ ///
+ /// \param name tag name
+ /// \return Reference to this for method chaining
+ SnapshotManager& RemoveTag(const std::string& name);
+
+ /// \brief Replace the tag with the given name to point to the specified
snapshot.
+ ///
+ /// \param name tag to replace
+ /// \param snapshot_id new snapshot id for the given tag
+ /// \return Reference to this for method chaining
+ SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id);
+
+ /// \brief Replace the branch with the given name to point to the specified
snapshot.
+ ///
+ /// \param name branch to replace
+ /// \param snapshot_id new snapshot id for the given branch
+ /// \return Reference to this for method chaining
+ SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id);
+
+ /// \brief Replace the 'from' branch to point to the 'to' snapshot. The 'to'
will
+ /// remain unchanged, and 'from' branch will retain its retention
properties. If the
+ /// 'from' branch does not exist, it will be created with default retention
properties.
+ ///
+ /// \param from branch to replace
+ /// \param to the branch 'from' should be replaced with
+ /// \return Reference to this for method chaining
+ SnapshotManager& ReplaceBranch(const std::string& from, const std::string&
to);
+
+ /// \brief Perform a fast-forward of 'from' up to the 'to' snapshot if
'from' is an
+ /// ancestor of 'to'. The 'to' will remain unchanged, and 'from' will retain
its
+ /// retention properties. If the 'from' branch does not exist, it will be
created with
+ /// default retention properties.
+ ///
+ /// \param from branch to fast-forward
+ /// \param to ref for the 'from' branch to be fast forwarded to
+ /// \return Reference to this for method chaining
+ SnapshotManager& FastForwardBranch(const std::string& from, const
std::string& to);
+
+ /// \brief Rename a branch.
+ ///
+ /// \param name name of branch to rename
+ /// \param new_name the desired new name of the branch
+ /// \return Reference to this for method chaining
+ SnapshotManager& RenameBranch(const std::string& name, const std::string&
new_name);
+
+ /// \brief Update the minimum number of snapshots to keep for a branch.
+ ///
+ /// \param branch_name branch name
+ /// \param min_snapshots_to_keep minimum number of snapshots to retain on
the branch
+ /// \return Reference to this for method chaining
+ SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name,
+ int32_t min_snapshots_to_keep);
+
+ /// \brief Update the max snapshot age for a branch.
+ ///
+ /// \param branch_name branch name
+ /// \param max_snapshot_age_ms maximum snapshot age in milliseconds to
retain on branch
+ /// \return Reference to this for method chaining
+ SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name,
+ int64_t max_snapshot_age_ms);
+
+ /// \brief Update the retention policy for a reference.
+ ///
+ /// \param name branch name
+ /// \param max_ref_age_ms retention age in milliseconds of the tag reference
itself
+ /// \return Reference to this for method chaining
+ SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t
max_ref_age_ms);
+
+ /// \brief Commit all pending changes.
+ Status Commit();
+
+ private:
+ SnapshotManager(std::shared_ptr<Transaction> transaction, bool
is_external_transaction);
+
+ /// \brief Get or create the UpdateSnapshotReference operation.
+ Result<std::shared_ptr<UpdateSnapshotReference>>
UpdateSnapshotReferencesOperation();
+
+ /// \brief Commit any pending reference updates if they exist.
+ Status CommitIfRefUpdatesExist();
+
+ std::shared_ptr<Transaction> transaction_;
+ const bool is_external_transaction_;
+ std::shared_ptr<UpdateSnapshotReference> update_snap_refs_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.cc
b/src/iceberg/update/snapshot_update.cc
index 4865278f..b4468256 100644
--- a/src/iceberg/update/snapshot_update.cc
+++ b/src/iceberg/update/snapshot_update.cc
@@ -331,20 +331,6 @@ Status SnapshotUpdate::Finalize(std::optional<Error>
commit_error) {
return {};
}
-Status SnapshotUpdate::SetTargetBranch(const std::string& branch) {
- ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty");
-
- if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) {
- ICEBERG_PRECHECK(
- ref_it->second->type() == SnapshotRefType::kBranch,
- "{} is a tag, not a branch. Tags cannot be targets for producing
snapshots",
- branch);
- }
-
- target_branch_ = branch;
- return {};
-}
-
Result<std::unordered_map<std::string, std::string>>
SnapshotUpdate::ComputeSummary(
const TableMetadata& previous) {
std::unordered_map<std::string, std::string> summary = Summary();
diff --git a/src/iceberg/update/snapshot_update.h
b/src/iceberg/update/snapshot_update.h
index 12c3b19d..fdbb2660 100644
--- a/src/iceberg/update/snapshot_update.h
+++ b/src/iceberg/update/snapshot_update.h
@@ -76,6 +76,28 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
return self;
}
+ /// \brief Perform operations on a particular branch
+ ///
+ /// \param branch Which is name of SnapshotRef of type branch
+ /// \return Reference to this for method chaining
+ auto& SetTargetBranch(this auto& self, const std::string& branch) {
+ if (branch.empty()) [[unlikely]] {
+ return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be
empty");
+ }
+
+ if (auto ref_it = self.base().refs.find(branch); ref_it !=
self.base().refs.end()) {
+ if (ref_it->second->type() != SnapshotRefType::kBranch) {
+ return self.AddError(ErrorKind::kInvalidArgument,
+ "{} is a tag, not a branch. Tags cannot be
targets for "
+ "producing snapshots",
+ branch);
+ }
+ }
+
+ self.target_branch_ = branch;
+ return self;
+ }
+
/// \brief Set a summary property.
///
/// \param property The property name
@@ -121,7 +143,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
std::span<const std::shared_ptr<DataFile>> files,
const std::shared_ptr<PartitionSpec>& spec);
- Status SetTargetBranch(const std::string& branch);
const std::string& target_branch() const { return target_branch_; }
bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; }
const std::string& commit_uuid() const { return commit_uuid_; }