This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2731e9e9 fix(table): add AssertRefSnapshotID requirements to
ExpireSnapshots (#672)
2731e9e9 is described below
commit 2731e9e9946a6ed9098ef7700c220c783cc2b220
Author: Krutika Dhananjay <[email protected]>
AuthorDate: Wed Jan 14 03:05:23 2026 +0530
fix(table): add AssertRefSnapshotID requirements to ExpireSnapshots (#672)
ExpireSnapshots now asserts that all ref snapshot IDs haven't changed
concurrently during the operation. This prevents a race condition where
a ref could be accidentally removed if it is updated to point to a
different snapshot chain by a client while ExpireSnapshots has
identified it as eligible for deletion.
---------
Co-authored-by: Krutika Dhananjay <[email protected]>
---
table/table_test.go | 189 +++++++++++++++++++++++++++++++++++++++++++++++++++
table/transaction.go | 9 ++-
2 files changed, 197 insertions(+), 1 deletion(-)
diff --git a/table/table_test.go b/table/table_test.go
index 437c461f..72997fef 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -970,6 +970,195 @@ func (t *TableWritingTestSuite) TestExpireSnapshots() {
t.Require().Equal(2, len(slices.Collect(tbl.Metadata().SnapshotLogs())))
}
+// validatingCatalog validates requirements before applying updates,
+// simulating real catalog behavior for concurrent modification tests.
+type validatingCatalog struct {
+ metadata table.Metadata
+}
+
+func (m *validatingCatalog) LoadTable(ctx context.Context, ident
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *validatingCatalog) CommitTable(ctx context.Context, ident
table.Identifier, reqs []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ // Validate requirements against current metadata (simulates catalog
behavior)
+ for _, req := range reqs {
+ if err := req.Validate(m.metadata); err != nil {
+ return nil, "", err
+ }
+ }
+
+ meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
+ if err != nil {
+ return nil, "", err
+ }
+
+ m.metadata = meta
+
+ return meta, "", nil
+}
+
+// TestExpireSnapshotsRejectsOnRefRollback verifies that ExpireSnapshots fails
+// when a ref is rolled back to an ancestor snapshot concurrently.
+//
+// Scenario:
+// - main -> snapshot 5 (newest), chain: 5 <- 4 <- 3 <- 2 <- 1
+// - ExpireSnapshots calculates: keep {5, 4, 3}, delete {2, 1}
+// - Concurrently, client rolls main -> snapshot 2
+// - Without assertion: would delete snapshots 1, leaving main with only 1
accessible snapshot
+// - With assertion: commit fails because main's snapshot ID changed
+func (t *TableWritingTestSuite) TestExpireSnapshotsRejectsOnRefRollback() {
+ fs := iceio.LocalFS{}
+
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath :=
fmt.Sprintf("%s/expire_reject_rollback_v%d/data-%d.parquet", t.location,
t.formatVersion, i)
+ t.writeParquet(fs, filePath, t.arrTablePromotedTypes)
+ files = append(files, filePath)
+ }
+
+ ident := table.Identifier{"default", "expire_reject_rollback_v" +
strconv.Itoa(t.formatVersion)}
+ meta, err := table.NewMetadata(t.tableSchemaPromotedTypes,
iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, t.location,
iceberg.Properties{table.PropertyFormatVersion: strconv.Itoa(t.formatVersion)})
+ t.Require().NoError(err)
+
+ ctx := context.Background()
+ cat := &validatingCatalog{meta}
+
+ tbl := table.New(
+ ident,
+ meta,
+ t.getMetadataLoc(),
+ func(ctx context.Context) (iceio.IO, error) {
+ return fs, nil
+ },
+ cat,
+ )
+
+ // Create 5 snapshots
+ for i := range 5 {
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(ctx, files[i:i+1], nil, false))
+ tbl, err = tx.Commit(ctx)
+ t.Require().NoError(err)
+ }
+ t.Require().Equal(5, len(tbl.Metadata().Snapshots()))
+
+ // Get snapshot IDs for later use
+ snapshots := tbl.Metadata().Snapshots()
+ snapshot2 := snapshots[1] // Second snapshot (index 1)
+
+ // Start ExpireSnapshots transaction (will calculate based on current
main -> snapshot 5)
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.ExpireSnapshots(table.WithOlderThan(0),
table.WithRetainLast(3)))
+
+ // Simulate concurrent rollback: update catalog's metadata to point
main -> snapshot 2
+ // This simulates another client rolling back main before
ExpireSnapshots commits
+ rollbackUpdates := []table.Update{
+ table.NewSetSnapshotRefUpdate("main", snapshot2.SnapshotID,
table.BranchRef, -1, -1, -1),
+ }
+ cat.metadata, _, err = cat.CommitTable(ctx, ident, nil, rollbackUpdates)
+ t.Require().NoError(err)
+
+ // Attempt to commit ExpireSnapshots - should fail due to
AssertRefSnapshotID
+ _, err = tx.Commit(ctx)
+ t.Require().Error(err)
+ t.Require().Contains(err.Error(), "requirement failed")
+ t.Require().Contains(err.Error(), "main")
+}
+
+// TestExpireSnapshotsRejectsOnRefUpdate verifies that ExpireSnapshots fails
+// when a ref eligible for deletion is concurrently updated to a newer
snapshot.
+//
+// Scenario:
+// - tag1 -> old snapshot, eligible for deletion (maxRefAgeMs exceeded)
+// - ExpireSnapshots decides to remove tag1
+// - Concurrently, client updates tag1 -> newer snapshot (no longer eligible)
+// - Without assertion: tag1 would be deleted despite being updated
+// - With assertion: commit fails because tag1's snapshot ID changed
+func (t *TableWritingTestSuite) TestExpireSnapshotsRejectsOnRefUpdate() {
+ fs := iceio.LocalFS{}
+
+ files := make([]string, 0)
+ for i := range 3 {
+ filePath :=
fmt.Sprintf("%s/expire_reject_update_v%d/data-%d.parquet", t.location,
t.formatVersion, i)
+ t.writeParquet(fs, filePath, t.arrTablePromotedTypes)
+ files = append(files, filePath)
+ }
+
+ ident := table.Identifier{"default", "expire_reject_update_v" +
strconv.Itoa(t.formatVersion)}
+ meta, err := table.NewMetadata(t.tableSchemaPromotedTypes,
iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, t.location,
iceberg.Properties{table.PropertyFormatVersion: strconv.Itoa(t.formatVersion)})
+ t.Require().NoError(err)
+
+ ctx := context.Background()
+ cat := &validatingCatalog{meta}
+
+ tbl := table.New(
+ ident,
+ meta,
+ t.getMetadataLoc(),
+ func(ctx context.Context) (iceio.IO, error) {
+ return fs, nil
+ },
+ cat,
+ )
+
+ // Create 3 snapshots
+ for i := range 3 {
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(ctx, files[i:i+1], nil, false))
+ tbl, err = tx.Commit(ctx)
+ t.Require().NoError(err)
+ }
+ t.Require().Equal(3, len(tbl.Metadata().Snapshots()))
+
+ snapshots := tbl.Metadata().Snapshots()
+ oldSnapshot := snapshots[0] // Oldest snapshot
+ newerSnapshot := snapshots[2] // Newest snapshot
+
+ // Create a tag pointing to the old snapshot with a short maxRefAgeMs
+ // This tag will be eligible for deletion
+ maxRefAgeMs := int64(1) // 1ms - will definitely be exceeded
+ tagUpdates := []table.Update{
+ table.NewSetSnapshotRefUpdate("expiring-tag",
oldSnapshot.SnapshotID, table.TagRef, maxRefAgeMs, -1, -1),
+ }
+ cat.metadata, _, err = cat.CommitTable(ctx, ident, nil, tagUpdates)
+ t.Require().NoError(err)
+
+ // Reload table with updated metadata
+ tbl = table.New(
+ ident,
+ cat.metadata,
+ t.getMetadataLoc(),
+ func(ctx context.Context) (iceio.IO, error) {
+ return fs, nil
+ },
+ cat,
+ )
+
+ // Wait a bit to ensure the tag's ref age exceeds maxRefAgeMs
+ time.Sleep(10 * time.Millisecond)
+
+ // Start ExpireSnapshots transaction (will identify expiring-tag as
eligible for deletion)
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.ExpireSnapshots(table.WithOlderThan(time.Hour),
table.WithRetainLast(1)))
+
+ // Simulate concurrent update: another client updates the tag to point
to a newer snapshot
+ // This makes the tag no longer eligible for deletion
+ updateTagUpdates := []table.Update{
+ table.NewSetSnapshotRefUpdate("expiring-tag",
newerSnapshot.SnapshotID, table.TagRef, maxRefAgeMs, -1, -1),
+ }
+ cat.metadata, _, err = cat.CommitTable(ctx, ident, nil,
updateTagUpdates)
+ t.Require().NoError(err)
+
+ // Attempt to commit ExpireSnapshots - should fail due to
AssertRefSnapshotID
+ _, err = tx.Commit(ctx)
+ t.Require().Error(err)
+ t.Require().Contains(err.Error(), "requirement failed")
+ t.Require().Contains(err.Error(), "expiring-tag")
+}
+
func (t *TableWritingTestSuite) TestWriteSpecialCharacterColumn() {
ident := table.Identifier{"default", "write_special_character_column"}
colNameWithSpecialChar := "letter/abc"
diff --git a/table/transaction.go b/table/transaction.go
index d949df3a..b2eccbb6 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -188,6 +188,7 @@ func (t *Transaction) ExpireSnapshots(opts
...ExpireSnapshotsOpt) error {
var (
cfg expireSnapshotsCfg
updates []Update
+ reqs []Requirement
snapsToKeep = make(map[int64]struct{})
nowMs = time.Now().UnixMilli()
)
@@ -197,6 +198,12 @@ func (t *Transaction) ExpireSnapshots(opts
...ExpireSnapshotsOpt) error {
}
for refName, ref := range t.meta.refs {
+ // Assert that this ref's snapshot ID hasn't changed
concurrently.
+ // This ensures we don't accidentally expire snapshots that are
now
+ // referenced by updated refs.
+ snapshotID := ref.SnapshotID
+ reqs = append(reqs, AssertRefSnapshotID(refName, &snapshotID))
+
if refName == MainBranch {
snapsToKeep[ref.SnapshotID] = struct{}{}
}
@@ -270,7 +277,7 @@ func (t *Transaction) ExpireSnapshots(opts
...ExpireSnapshotsOpt) error {
updates = append(updates, NewRemoveSnapshotsUpdate(snapsToDelete))
- return t.apply(updates, nil)
+ return t.apply(updates, reqs)
}
func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table,
batchSize int64, snapshotProps iceberg.Properties) error {