laskoviymishka commented on code in PR #2591: URL: https://github.com/apache/iceberg-rust/pull/2591#discussion_r3383281731
########## crates/iceberg/src/transaction/expire_snapshots.rs: ########## @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// Default number of most recent snapshots to retain when none is specified. +const DEFAULT_RETAIN_LAST: usize = 1; + +/// A transaction action that removes snapshots from table metadata. +/// +/// This only rewrites metadata; the now-unreferenced data and metadata files are left untouched. +/// Physical file cleanup is the responsibility of a higher-level maintenance operation built on +/// top of this action. +/// +/// Selection follows Java `RemoveSnapshots`, with a simpler retention model: +/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and age-based expiry +/// ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a snapshot is expired +/// if it is named explicitly *or* it is older than the cutoff. +/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even when they are older +/// than the cutoff; it does not protect snapshots named explicitly. +/// - Snapshots referenced by a branch or tag (including the current snapshot) are never expired, +/// and naming such an id explicitly is an error, since +/// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would otherwise +/// drop the ref silently. +/// +/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, per-branch +/// `min_snapshots_to_keep`) are not yet implemented. +pub struct ExpireSnapshotsAction { + snapshot_ids: Vec<i64>, + older_than_ms: Option<i64>, + retain_last: Option<usize>, +} + +impl ExpireSnapshotsAction { + pub(crate) fn new() -> Self { + Self { + snapshot_ids: vec![], + older_than_ms: None, + retain_last: None, + } + } + + /// Expire these snapshot ids in addition to any age-based selection. + /// + /// An id that is still referenced by a branch or tag cannot be expired and causes + /// [`commit`](TransactionAction::commit) to fail. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.snapshot_ids = snapshot_ids; + self + } + + /// Expire snapshots whose timestamp is strictly older than `older_than_ms`. + pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self { + self.older_than_ms = Some(older_than_ms); + self + } + + /// Keep at least the `retain_last` most recent snapshots when expiring by age (defaults to 1). + /// + /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); it has no effect on + /// its own and does not protect snapshots named via + /// [`expire_snapshot_ids`](Self::expire_snapshot_ids). + pub fn retain_last(mut self, retain_last: usize) -> Self { + self.retain_last = Some(retain_last); + self + } + + fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> { + let metadata = table.metadata(); + let protected = Self::protected_snapshot_ids(metadata); + let existing: HashSet<i64> = metadata.snapshots().map(|s| s.snapshot_id()).collect(); + + let mut to_expire: HashSet<i64> = HashSet::new(); + + // Explicit ids are expired regardless of age, but one still referenced by a branch or tag + // cannot be expired (Java's RemoveSnapshots errors rather than silently dropping the ref). + for id in &self.snapshot_ids { + if protected.contains(id) { + return Err(Self::reference_error(metadata, *id)); + } + if existing.contains(id) { + to_expire.insert(*id); + } + } + + // Age-based expiry, additive with the explicit ids. `retain_last` keeps the most recent + // snapshots even when older than the cutoff. Without a cutoff there is no age expiry. + if let Some(older_than_ms) = self.older_than_ms { + let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST); + let mut snapshots: Vec<_> = metadata.snapshots().cloned().collect(); + if snapshots.len() > retain_last { + snapshots.sort_by_key(|s| s.timestamp_ms()); + snapshots.truncate(snapshots.len() - retain_last); + for snapshot in snapshots { + if snapshot.timestamp_ms() < older_than_ms + && !protected.contains(&snapshot.snapshot_id()) + { + to_expire.insert(snapshot.snapshot_id()); + } + } + } + } + + let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect(); + snapshot_ids.sort_unstable(); + Ok(snapshot_ids) + } + + /// Snapshot ids that must never be expired: the current snapshot and every branch head and tag + /// target. Removing any of these would silently drop a ref in + /// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots). + fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> { + let mut ids: HashSet<i64> = metadata.refs.values().map(|r| r.snapshot_id).collect(); + if let Some(current) = metadata.current_snapshot_id() { + ids.insert(current); + } + ids + } + + fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error { + if metadata.current_snapshot_id() == Some(snapshot_id) { + return Error::new(ErrorKind::DataInvalid, "Cannot expire the current snapshot"); + } + let refs: Vec<&str> = metadata + .refs + .iter() + .filter(|(_, r)| r.snapshot_id == snapshot_id) + .map(|(name, _)| name.as_str()) + .collect(); + Error::new( + ErrorKind::DataInvalid, + format!("Cannot expire snapshot {snapshot_id}: still referenced by {refs:?}"), + ) + } +} + +#[async_trait] +impl TransactionAction for ExpireSnapshotsAction { + async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { + let snapshot_ids = self.snapshot_ids_to_expire(table)?; + + if snapshot_ids.is_empty() { + return Ok(ActionCommit::new(vec![], vec![])); + } + + Ok(ActionCommit::new( + vec![TableUpdate::RemoveSnapshots { snapshot_ids }], + vec![TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }], + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::TableUpdate; + use crate::transaction::Transaction; + use crate::transaction::action::{ApplyTransactionAction, TransactionAction}; + use crate::transaction::expire_snapshots::ExpireSnapshotsAction; + use crate::transaction::tests::make_v2_table; + + // `make_v2_table` carries an older snapshot (ts 1515100955770) and a current + // snapshot (ts 1555100955770). + const OLD_SNAPSHOT: i64 = 3051729675574597004; + const CURRENT_SNAPSHOT: i64 = 3055729675574597004; + + async fn removed_ids(action: ExpireSnapshotsAction) -> Vec<i64> { + let table = make_v2_table(); + let mut commit = Arc::new(action).commit(&table).await.unwrap(); + match commit.take_updates().into_iter().next() { + Some(TableUpdate::RemoveSnapshots { snapshot_ids }) => snapshot_ids, + None => vec![], + other => panic!("unexpected update: {other:?}"), + } + } + + fn action() -> ExpireSnapshotsAction { + ExpireSnapshotsAction::new() + } + + #[tokio::test] + async fn test_expire_explicit_snapshot_id() { + assert_eq!( + removed_ids(action().expire_snapshot_ids(vec![OLD_SNAPSHOT])).await, + vec![OLD_SNAPSHOT] + ); + } + + #[tokio::test] + async fn test_explicit_unknown_id_is_ignored() { + assert!( + removed_ids(action().expire_snapshot_ids(vec![42])) + .await + .is_empty() + ); + } + + #[tokio::test] + async fn test_cannot_expire_current_snapshot() { + let table = make_v2_table(); Review Comment: The PR description says the combined explicit + age path is tested, but I don't see a test that sets both — and it's the primary interaction between the two code paths, so it's the most likely regression site. I'd add one with non-overlapping targets: `expire_snapshot_ids(vec![X]).expire_older_than_ms(cutoff)` where an age-qualifying snapshot Y is distinct from X, asserting both land in the result. ########## crates/iceberg/src/transaction/expire_snapshots.rs: ########## @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// Default number of most recent snapshots to retain when none is specified. +const DEFAULT_RETAIN_LAST: usize = 1; + +/// A transaction action that removes snapshots from table metadata. +/// +/// This only rewrites metadata; the now-unreferenced data and metadata files are left untouched. +/// Physical file cleanup is the responsibility of a higher-level maintenance operation built on +/// top of this action. +/// +/// Selection follows Java `RemoveSnapshots`, with a simpler retention model: +/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and age-based expiry +/// ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a snapshot is expired +/// if it is named explicitly *or* it is older than the cutoff. +/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even when they are older +/// than the cutoff; it does not protect snapshots named explicitly. +/// - Snapshots referenced by a branch or tag (including the current snapshot) are never expired, +/// and naming such an id explicitly is an error, since +/// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would otherwise +/// drop the ref silently. +/// +/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, per-branch +/// `min_snapshots_to_keep`) are not yet implemented. +pub struct ExpireSnapshotsAction { + snapshot_ids: Vec<i64>, + older_than_ms: Option<i64>, + retain_last: Option<usize>, +} + +impl ExpireSnapshotsAction { + pub(crate) fn new() -> Self { + Self { + snapshot_ids: vec![], + older_than_ms: None, + retain_last: None, + } + } + + /// Expire these snapshot ids in addition to any age-based selection. + /// + /// An id that is still referenced by a branch or tag cannot be expired and causes + /// [`commit`](TransactionAction::commit) to fail. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.snapshot_ids = snapshot_ids; + self + } + + /// Expire snapshots whose timestamp is strictly older than `older_than_ms`. + pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self { + self.older_than_ms = Some(older_than_ms); + self + } + + /// Keep at least the `retain_last` most recent snapshots when expiring by age (defaults to 1). + /// + /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); it has no effect on + /// its own and does not protect snapshots named via + /// [`expire_snapshot_ids`](Self::expire_snapshot_ids). + pub fn retain_last(mut self, retain_last: usize) -> Self { + self.retain_last = Some(retain_last); + self + } + + fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> { + let metadata = table.metadata(); + let protected = Self::protected_snapshot_ids(metadata); + let existing: HashSet<i64> = metadata.snapshots().map(|s| s.snapshot_id()).collect(); + + let mut to_expire: HashSet<i64> = HashSet::new(); + + // Explicit ids are expired regardless of age, but one still referenced by a branch or tag + // cannot be expired (Java's RemoveSnapshots errors rather than silently dropping the ref). + for id in &self.snapshot_ids { + if protected.contains(id) { + return Err(Self::reference_error(metadata, *id)); + } + if existing.contains(id) { + to_expire.insert(*id); + } + } + + // Age-based expiry, additive with the explicit ids. `retain_last` keeps the most recent + // snapshots even when older than the cutoff. Without a cutoff there is no age expiry. + if let Some(older_than_ms) = self.older_than_ms { + let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST); + let mut snapshots: Vec<_> = metadata.snapshots().cloned().collect(); + if snapshots.len() > retain_last { + snapshots.sort_by_key(|s| s.timestamp_ms()); + snapshots.truncate(snapshots.len() - retain_last); + for snapshot in snapshots { + if snapshot.timestamp_ms() < older_than_ms + && !protected.contains(&snapshot.snapshot_id()) + { + to_expire.insert(snapshot.snapshot_id()); + } + } + } + } + + let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect(); + snapshot_ids.sort_unstable(); + Ok(snapshot_ids) + } + + /// Snapshot ids that must never be expired: the current snapshot and every branch head and tag + /// target. Removing any of these would silently drop a ref in + /// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots). + fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> { + let mut ids: HashSet<i64> = metadata.refs.values().map(|r| r.snapshot_id).collect(); + if let Some(current) = metadata.current_snapshot_id() { + ids.insert(current); + } + ids + } + + fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error { + if metadata.current_snapshot_id() == Some(snapshot_id) { + return Error::new(ErrorKind::DataInvalid, "Cannot expire the current snapshot"); + } + let refs: Vec<&str> = metadata + .refs + .iter() + .filter(|(_, r)| r.snapshot_id == snapshot_id) + .map(|(name, _)| name.as_str()) + .collect(); + Error::new( + ErrorKind::DataInvalid, + format!("Cannot expire snapshot {snapshot_id}: still referenced by {refs:?}"), + ) + } +} + +#[async_trait] +impl TransactionAction for ExpireSnapshotsAction { + async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { + let snapshot_ids = self.snapshot_ids_to_expire(table)?; + + if snapshot_ids.is_empty() { + return Ok(ActionCommit::new(vec![], vec![])); + } + + Ok(ActionCommit::new( + vec![TableUpdate::RemoveSnapshots { snapshot_ids }], + vec![TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }], + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::TableUpdate; + use crate::transaction::Transaction; + use crate::transaction::action::{ApplyTransactionAction, TransactionAction}; + use crate::transaction::expire_snapshots::ExpireSnapshotsAction; + use crate::transaction::tests::make_v2_table; + + // `make_v2_table` carries an older snapshot (ts 1515100955770) and a current + // snapshot (ts 1555100955770). Review Comment: `UuidMatch` alone leaves a race window. If another writer commits a new snapshot between `snapshot_ids_to_expire` and `update_table`, that snapshot is now live but its parent may be in our to-expire set — removing the parent orphans the new branch history, and the UUID check won't catch it because the table UUID is unchanged. `SnapshotProducer` guards against exactly this with a `RefSnapshotIdMatch` on `main` (see snapshot.rs ~517). I'd add the same here: ```rust TableRequirement::RefSnapshotIdMatch { r#ref: "main".to_string(), snapshot_id: table.metadata().current_snapshot_id(), }, ``` Thoughts? ########## crates/iceberg/src/transaction/expire_snapshots.rs: ########## @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// Default number of most recent snapshots to retain when none is specified. +const DEFAULT_RETAIN_LAST: usize = 1; + +/// A transaction action that removes snapshots from table metadata. +/// +/// This only rewrites metadata; the now-unreferenced data and metadata files are left untouched. +/// Physical file cleanup is the responsibility of a higher-level maintenance operation built on +/// top of this action. +/// +/// Selection follows Java `RemoveSnapshots`, with a simpler retention model: +/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and age-based expiry +/// ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a snapshot is expired +/// if it is named explicitly *or* it is older than the cutoff. +/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even when they are older +/// than the cutoff; it does not protect snapshots named explicitly. +/// - Snapshots referenced by a branch or tag (including the current snapshot) are never expired, +/// and naming such an id explicitly is an error, since +/// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would otherwise +/// drop the ref silently. +/// +/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, per-branch +/// `min_snapshots_to_keep`) are not yet implemented. +pub struct ExpireSnapshotsAction { + snapshot_ids: Vec<i64>, + older_than_ms: Option<i64>, + retain_last: Option<usize>, +} + +impl ExpireSnapshotsAction { + pub(crate) fn new() -> Self { + Self { + snapshot_ids: vec![], + older_than_ms: None, + retain_last: None, + } + } + + /// Expire these snapshot ids in addition to any age-based selection. + /// + /// An id that is still referenced by a branch or tag cannot be expired and causes + /// [`commit`](TransactionAction::commit) to fail. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.snapshot_ids = snapshot_ids; + self + } + + /// Expire snapshots whose timestamp is strictly older than `older_than_ms`. + pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self { + self.older_than_ms = Some(older_than_ms); + self + } + + /// Keep at least the `retain_last` most recent snapshots when expiring by age (defaults to 1). + /// + /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); it has no effect on + /// its own and does not protect snapshots named via + /// [`expire_snapshot_ids`](Self::expire_snapshot_ids). + pub fn retain_last(mut self, retain_last: usize) -> Self { + self.retain_last = Some(retain_last); + self + } + + fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> { + let metadata = table.metadata(); + let protected = Self::protected_snapshot_ids(metadata); + let existing: HashSet<i64> = metadata.snapshots().map(|s| s.snapshot_id()).collect(); + + let mut to_expire: HashSet<i64> = HashSet::new(); + + // Explicit ids are expired regardless of age, but one still referenced by a branch or tag + // cannot be expired (Java's RemoveSnapshots errors rather than silently dropping the ref). + for id in &self.snapshot_ids { + if protected.contains(id) { + return Err(Self::reference_error(metadata, *id)); + } + if existing.contains(id) { + to_expire.insert(*id); + } + } + + // Age-based expiry, additive with the explicit ids. `retain_last` keeps the most recent + // snapshots even when older than the cutoff. Without a cutoff there is no age expiry. + if let Some(older_than_ms) = self.older_than_ms { + let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST); + let mut snapshots: Vec<_> = metadata.snapshots().cloned().collect(); + if snapshots.len() > retain_last { + snapshots.sort_by_key(|s| s.timestamp_ms()); + snapshots.truncate(snapshots.len() - retain_last); + for snapshot in snapshots { + if snapshot.timestamp_ms() < older_than_ms + && !protected.contains(&snapshot.snapshot_id()) + { + to_expire.insert(snapshot.snapshot_id()); + } + } + } + } + + let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect(); + snapshot_ids.sort_unstable(); + Ok(snapshot_ids) Review Comment: I think there's a subtle correctness issue here that's the deepest thing in the PR. We protect only ref *heads* in `protected_snapshot_ids`, but age expiry walks a global pool. For two branches sharing a common base, the shared ancestor isn't a head, so it gets expired even though it's still reachable via `parent_snapshot_id` — afterwards that pointer dangles and tools that walk full ancestry (Flink, Trino CDC) see truncated history. The same root cause bites `retain_last`: Java's `computeBranchSnapshotsToRetain` applies it per-branch along each ancestry chain, but here it's the N globally-most-recent across all branches. With branch A `[A1,A2,A3]` and branch B `[B1,B2,B3]`, `retain_last=2` keeps `[B2,B3]` globally and expires A2, which Java would retain. I'd build the protected set by walking each branch's ancestor chain via `parent_snapshot_id`, stopping when count >= `retain_last` and timestamp < cutoff, then union that with the head set before age expiry. That's a bigger change, but I don't think we can ship age-based expiry without it. wdyt? ########## crates/iceberg/src/transaction/expire_snapshots.rs: ########## @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// Default number of most recent snapshots to retain when none is specified. +const DEFAULT_RETAIN_LAST: usize = 1; + +/// A transaction action that removes snapshots from table metadata. +/// +/// This only rewrites metadata; the now-unreferenced data and metadata files are left untouched. +/// Physical file cleanup is the responsibility of a higher-level maintenance operation built on +/// top of this action. +/// +/// Selection follows Java `RemoveSnapshots`, with a simpler retention model: +/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and age-based expiry +/// ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a snapshot is expired +/// if it is named explicitly *or* it is older than the cutoff. +/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even when they are older +/// than the cutoff; it does not protect snapshots named explicitly. +/// - Snapshots referenced by a branch or tag (including the current snapshot) are never expired, +/// and naming such an id explicitly is an error, since +/// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would otherwise +/// drop the ref silently. +/// +/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, per-branch +/// `min_snapshots_to_keep`) are not yet implemented. +pub struct ExpireSnapshotsAction { + snapshot_ids: Vec<i64>, + older_than_ms: Option<i64>, + retain_last: Option<usize>, +} + +impl ExpireSnapshotsAction { + pub(crate) fn new() -> Self { + Self { + snapshot_ids: vec![], + older_than_ms: None, + retain_last: None, + } + } + + /// Expire these snapshot ids in addition to any age-based selection. + /// + /// An id that is still referenced by a branch or tag cannot be expired and causes + /// [`commit`](TransactionAction::commit) to fail. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.snapshot_ids = snapshot_ids; + self + } + + /// Expire snapshots whose timestamp is strictly older than `older_than_ms`. + pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self { + self.older_than_ms = Some(older_than_ms); + self + } + + /// Keep at least the `retain_last` most recent snapshots when expiring by age (defaults to 1). + /// + /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); it has no effect on + /// its own and does not protect snapshots named via + /// [`expire_snapshot_ids`](Self::expire_snapshot_ids). + pub fn retain_last(mut self, retain_last: usize) -> Self { + self.retain_last = Some(retain_last); + self + } + + fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> { + let metadata = table.metadata(); + let protected = Self::protected_snapshot_ids(metadata); + let existing: HashSet<i64> = metadata.snapshots().map(|s| s.snapshot_id()).collect(); + + let mut to_expire: HashSet<i64> = HashSet::new(); + + // Explicit ids are expired regardless of age, but one still referenced by a branch or tag + // cannot be expired (Java's RemoveSnapshots errors rather than silently dropping the ref). + for id in &self.snapshot_ids { + if protected.contains(id) { + return Err(Self::reference_error(metadata, *id)); + } + if existing.contains(id) { + to_expire.insert(*id); + } + } + + // Age-based expiry, additive with the explicit ids. `retain_last` keeps the most recent + // snapshots even when older than the cutoff. Without a cutoff there is no age expiry. + if let Some(older_than_ms) = self.older_than_ms { + let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST); + let mut snapshots: Vec<_> = metadata.snapshots().cloned().collect(); + if snapshots.len() > retain_last { + snapshots.sort_by_key(|s| s.timestamp_ms()); + snapshots.truncate(snapshots.len() - retain_last); + for snapshot in snapshots { + if snapshot.timestamp_ms() < older_than_ms + && !protected.contains(&snapshot.snapshot_id()) + { + to_expire.insert(snapshot.snapshot_id()); + } + } + } + } + + let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect(); + snapshot_ids.sort_unstable(); + Ok(snapshot_ids) + } + + /// Snapshot ids that must never be expired: the current snapshot and every branch head and tag + /// target. Removing any of these would silently drop a ref in + /// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots). + fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> { + let mut ids: HashSet<i64> = metadata.refs.values().map(|r| r.snapshot_id).collect(); + if let Some(current) = metadata.current_snapshot_id() { + ids.insert(current); + } + ids + } + + fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error { + if metadata.current_snapshot_id() == Some(snapshot_id) { + return Error::new(ErrorKind::DataInvalid, "Cannot expire the current snapshot"); + } + let refs: Vec<&str> = metadata + .refs + .iter() + .filter(|(_, r)| r.snapshot_id == snapshot_id) + .map(|(name, _)| name.as_str()) + .collect(); + Error::new( + ErrorKind::DataInvalid, + format!("Cannot expire snapshot {snapshot_id}: still referenced by {refs:?}"), + ) + } +} + +#[async_trait] +impl TransactionAction for ExpireSnapshotsAction { + async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { + let snapshot_ids = self.snapshot_ids_to_expire(table)?; + + if snapshot_ids.is_empty() { + return Ok(ActionCommit::new(vec![], vec![])); + } + + Ok(ActionCommit::new( + vec![TableUpdate::RemoveSnapshots { snapshot_ids }], + vec![TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }], + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; Review Comment: We never check `gc.enabled` before expiring. Java's `RemoveSnapshots` validates it in the constructor and refuses to run when it's `false`, since expiring metadata defeats a user's explicit decision to disable GC. I'd read it at the top of `commit` (or `snapshot_ids_to_expire`) and bail: ```rust if table.metadata().properties().get("gc.enabled").map(|v| v == "false").unwrap_or(false) { return Err(Error::new(ErrorKind::DataInvalid, "Cannot expire snapshots: gc.enabled is false")); } ``` ########## crates/iceberg/src/transaction/expire_snapshots.rs: ########## @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// Default number of most recent snapshots to retain when none is specified. +const DEFAULT_RETAIN_LAST: usize = 1; + +/// A transaction action that removes snapshots from table metadata. +/// +/// This only rewrites metadata; the now-unreferenced data and metadata files are left untouched. +/// Physical file cleanup is the responsibility of a higher-level maintenance operation built on +/// top of this action. +/// +/// Selection follows Java `RemoveSnapshots`, with a simpler retention model: +/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and age-based expiry +/// ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a snapshot is expired +/// if it is named explicitly *or* it is older than the cutoff. +/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even when they are older +/// than the cutoff; it does not protect snapshots named explicitly. +/// - Snapshots referenced by a branch or tag (including the current snapshot) are never expired, +/// and naming such an id explicitly is an error, since +/// [`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would otherwise +/// drop the ref silently. +/// +/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, per-branch +/// `min_snapshots_to_keep`) are not yet implemented. +pub struct ExpireSnapshotsAction { + snapshot_ids: Vec<i64>, + older_than_ms: Option<i64>, + retain_last: Option<usize>, +} + +impl ExpireSnapshotsAction { + pub(crate) fn new() -> Self { + Self { + snapshot_ids: vec![], + older_than_ms: None, + retain_last: None, + } + } + + /// Expire these snapshot ids in addition to any age-based selection. + /// + /// An id that is still referenced by a branch or tag cannot be expired and causes + /// [`commit`](TransactionAction::commit) to fail. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.snapshot_ids = snapshot_ids; + self + } + + /// Expire snapshots whose timestamp is strictly older than `older_than_ms`. + pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self { + self.older_than_ms = Some(older_than_ms); + self + } + + /// Keep at least the `retain_last` most recent snapshots when expiring by age (defaults to 1). + /// + /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); it has no effect on + /// its own and does not protect snapshots named via + /// [`expire_snapshot_ids`](Self::expire_snapshot_ids). + pub fn retain_last(mut self, retain_last: usize) -> Self { + self.retain_last = Some(retain_last); + self + } Review Comment: This replaces rather than accumulates, so `expire_snapshot_ids(vec![A]).expire_snapshot_ids(vec![B])` silently drops A. Every other builder (`add_data_files` etc.) uses `.extend()`, so the inconsistency is surprising, and since this is public surface, switching to accumulate later would be a breaking change. I'd accumulate and take an iterator to match `add_data_files`: ```rust pub fn expire_snapshot_ids(mut self, snapshot_ids: impl IntoIterator<Item = i64>) -> Self { self.snapshot_ids.extend(snapshot_ids); self } ``` If replace is intentional, I'd rename to `set_snapshot_ids` and document it — but accumulate feels right here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
