laskoviymishka commented on code in PR #2591:
URL: https://github.com/apache/iceberg-rust/pull/2591#discussion_r3383281731


##########
crates/iceberg/src/transaction/expire_snapshots.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::spec::TableMetadata;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// Default number of most recent snapshots to retain when none is specified.
+const DEFAULT_RETAIN_LAST: usize = 1;
+
+/// A transaction action that removes snapshots from table metadata.
+///
+/// This only rewrites metadata; the now-unreferenced data and metadata files 
are left untouched.
+/// Physical file cleanup is the responsibility of a higher-level maintenance 
operation built on
+/// top of this action.
+///
+/// Selection follows Java `RemoveSnapshots`, with a simpler retention model:
+/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and 
age-based expiry
+///   ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a 
snapshot is expired
+///   if it is named explicitly *or* it is older than the cutoff.
+/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even 
when they are older
+///   than the cutoff; it does not protect snapshots named explicitly.
+/// - Snapshots referenced by a branch or tag (including the current snapshot) 
are never expired,
+///   and naming such an id explicitly is an error, since
+///   
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would 
otherwise
+///   drop the ref silently.
+///
+/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, 
per-branch
+/// `min_snapshots_to_keep`) are not yet implemented.
+pub struct ExpireSnapshotsAction {
+    snapshot_ids: Vec<i64>,
+    older_than_ms: Option<i64>,
+    retain_last: Option<usize>,
+}
+
+impl ExpireSnapshotsAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            snapshot_ids: vec![],
+            older_than_ms: None,
+            retain_last: None,
+        }
+    }
+
+    /// Expire these snapshot ids in addition to any age-based selection.
+    ///
+    /// An id that is still referenced by a branch or tag cannot be expired 
and causes
+    /// [`commit`](TransactionAction::commit) to fail.
+    pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self {
+        self.snapshot_ids = snapshot_ids;
+        self
+    }
+
+    /// Expire snapshots whose timestamp is strictly older than 
`older_than_ms`.
+    pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self {
+        self.older_than_ms = Some(older_than_ms);
+        self
+    }
+
+    /// Keep at least the `retain_last` most recent snapshots when expiring by 
age (defaults to 1).
+    ///
+    /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); 
it has no effect on
+    /// its own and does not protect snapshots named via
+    /// [`expire_snapshot_ids`](Self::expire_snapshot_ids).
+    pub fn retain_last(mut self, retain_last: usize) -> Self {
+        self.retain_last = Some(retain_last);
+        self
+    }
+
+    fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> {
+        let metadata = table.metadata();
+        let protected = Self::protected_snapshot_ids(metadata);
+        let existing: HashSet<i64> = metadata.snapshots().map(|s| 
s.snapshot_id()).collect();
+
+        let mut to_expire: HashSet<i64> = HashSet::new();
+
+        // Explicit ids are expired regardless of age, but one still 
referenced by a branch or tag
+        // cannot be expired (Java's RemoveSnapshots errors rather than 
silently dropping the ref).
+        for id in &self.snapshot_ids {
+            if protected.contains(id) {
+                return Err(Self::reference_error(metadata, *id));
+            }
+            if existing.contains(id) {
+                to_expire.insert(*id);
+            }
+        }
+
+        // Age-based expiry, additive with the explicit ids. `retain_last` 
keeps the most recent
+        // snapshots even when older than the cutoff. Without a cutoff there 
is no age expiry.
+        if let Some(older_than_ms) = self.older_than_ms {
+            let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST);
+            let mut snapshots: Vec<_> = 
metadata.snapshots().cloned().collect();
+            if snapshots.len() > retain_last {
+                snapshots.sort_by_key(|s| s.timestamp_ms());
+                snapshots.truncate(snapshots.len() - retain_last);
+                for snapshot in snapshots {
+                    if snapshot.timestamp_ms() < older_than_ms
+                        && !protected.contains(&snapshot.snapshot_id())
+                    {
+                        to_expire.insert(snapshot.snapshot_id());
+                    }
+                }
+            }
+        }
+
+        let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect();
+        snapshot_ids.sort_unstable();
+        Ok(snapshot_ids)
+    }
+
+    /// Snapshot ids that must never be expired: the current snapshot and 
every branch head and tag
+    /// target. Removing any of these would silently drop a ref in
+    /// 
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots).
+    fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> {
+        let mut ids: HashSet<i64> = metadata.refs.values().map(|r| 
r.snapshot_id).collect();
+        if let Some(current) = metadata.current_snapshot_id() {
+            ids.insert(current);
+        }
+        ids
+    }
+
+    fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error {
+        if metadata.current_snapshot_id() == Some(snapshot_id) {
+            return Error::new(ErrorKind::DataInvalid, "Cannot expire the 
current snapshot");
+        }
+        let refs: Vec<&str> = metadata
+            .refs
+            .iter()
+            .filter(|(_, r)| r.snapshot_id == snapshot_id)
+            .map(|(name, _)| name.as_str())
+            .collect();
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot expire snapshot {snapshot_id}: still referenced by 
{refs:?}"),
+        )
+    }
+}
+
+#[async_trait]
+impl TransactionAction for ExpireSnapshotsAction {
+    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
+        let snapshot_ids = self.snapshot_ids_to_expire(table)?;
+
+        if snapshot_ids.is_empty() {
+            return Ok(ActionCommit::new(vec![], vec![]));
+        }
+
+        Ok(ActionCommit::new(
+            vec![TableUpdate::RemoveSnapshots { snapshot_ids }],
+            vec![TableRequirement::UuidMatch {
+                uuid: table.metadata().uuid(),
+            }],
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use crate::TableUpdate;
+    use crate::transaction::Transaction;
+    use crate::transaction::action::{ApplyTransactionAction, 
TransactionAction};
+    use crate::transaction::expire_snapshots::ExpireSnapshotsAction;
+    use crate::transaction::tests::make_v2_table;
+
+    // `make_v2_table` carries an older snapshot (ts 1515100955770) and a 
current
+    // snapshot (ts 1555100955770).
+    const OLD_SNAPSHOT: i64 = 3051729675574597004;
+    const CURRENT_SNAPSHOT: i64 = 3055729675574597004;
+
+    async fn removed_ids(action: ExpireSnapshotsAction) -> Vec<i64> {
+        let table = make_v2_table();
+        let mut commit = Arc::new(action).commit(&table).await.unwrap();
+        match commit.take_updates().into_iter().next() {
+            Some(TableUpdate::RemoveSnapshots { snapshot_ids }) => 
snapshot_ids,
+            None => vec![],
+            other => panic!("unexpected update: {other:?}"),
+        }
+    }
+
+    fn action() -> ExpireSnapshotsAction {
+        ExpireSnapshotsAction::new()
+    }
+
+    #[tokio::test]
+    async fn test_expire_explicit_snapshot_id() {
+        assert_eq!(
+            
removed_ids(action().expire_snapshot_ids(vec![OLD_SNAPSHOT])).await,
+            vec![OLD_SNAPSHOT]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_explicit_unknown_id_is_ignored() {
+        assert!(
+            removed_ids(action().expire_snapshot_ids(vec![42]))
+                .await
+                .is_empty()
+        );
+    }
+
+    #[tokio::test]
+    async fn test_cannot_expire_current_snapshot() {
+        let table = make_v2_table();

Review Comment:
   The PR description says the combined explicit + age path is tested, but I 
don't see a test that sets both — and it's the primary interaction between the 
two code paths, so it's the most likely regression site.
   
   I'd add one with non-overlapping targets: 
`expire_snapshot_ids(vec![X]).expire_older_than_ms(cutoff)` where an 
age-qualifying snapshot Y is distinct from X, asserting both land in the result.



##########
crates/iceberg/src/transaction/expire_snapshots.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::spec::TableMetadata;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// Default number of most recent snapshots to retain when none is specified.
+const DEFAULT_RETAIN_LAST: usize = 1;
+
+/// A transaction action that removes snapshots from table metadata.
+///
+/// This only rewrites metadata; the now-unreferenced data and metadata files 
are left untouched.
+/// Physical file cleanup is the responsibility of a higher-level maintenance 
operation built on
+/// top of this action.
+///
+/// Selection follows Java `RemoveSnapshots`, with a simpler retention model:
+/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and 
age-based expiry
+///   ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a 
snapshot is expired
+///   if it is named explicitly *or* it is older than the cutoff.
+/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even 
when they are older
+///   than the cutoff; it does not protect snapshots named explicitly.
+/// - Snapshots referenced by a branch or tag (including the current snapshot) 
are never expired,
+///   and naming such an id explicitly is an error, since
+///   
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would 
otherwise
+///   drop the ref silently.
+///
+/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, 
per-branch
+/// `min_snapshots_to_keep`) are not yet implemented.
+pub struct ExpireSnapshotsAction {
+    snapshot_ids: Vec<i64>,
+    older_than_ms: Option<i64>,
+    retain_last: Option<usize>,
+}
+
+impl ExpireSnapshotsAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            snapshot_ids: vec![],
+            older_than_ms: None,
+            retain_last: None,
+        }
+    }
+
+    /// Expire these snapshot ids in addition to any age-based selection.
+    ///
+    /// An id that is still referenced by a branch or tag cannot be expired 
and causes
+    /// [`commit`](TransactionAction::commit) to fail.
+    pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self {
+        self.snapshot_ids = snapshot_ids;
+        self
+    }
+
+    /// Expire snapshots whose timestamp is strictly older than 
`older_than_ms`.
+    pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self {
+        self.older_than_ms = Some(older_than_ms);
+        self
+    }
+
+    /// Keep at least the `retain_last` most recent snapshots when expiring by 
age (defaults to 1).
+    ///
+    /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); 
it has no effect on
+    /// its own and does not protect snapshots named via
+    /// [`expire_snapshot_ids`](Self::expire_snapshot_ids).
+    pub fn retain_last(mut self, retain_last: usize) -> Self {
+        self.retain_last = Some(retain_last);
+        self
+    }
+
+    fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> {
+        let metadata = table.metadata();
+        let protected = Self::protected_snapshot_ids(metadata);
+        let existing: HashSet<i64> = metadata.snapshots().map(|s| 
s.snapshot_id()).collect();
+
+        let mut to_expire: HashSet<i64> = HashSet::new();
+
+        // Explicit ids are expired regardless of age, but one still 
referenced by a branch or tag
+        // cannot be expired (Java's RemoveSnapshots errors rather than 
silently dropping the ref).
+        for id in &self.snapshot_ids {
+            if protected.contains(id) {
+                return Err(Self::reference_error(metadata, *id));
+            }
+            if existing.contains(id) {
+                to_expire.insert(*id);
+            }
+        }
+
+        // Age-based expiry, additive with the explicit ids. `retain_last` 
keeps the most recent
+        // snapshots even when older than the cutoff. Without a cutoff there 
is no age expiry.
+        if let Some(older_than_ms) = self.older_than_ms {
+            let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST);
+            let mut snapshots: Vec<_> = 
metadata.snapshots().cloned().collect();
+            if snapshots.len() > retain_last {
+                snapshots.sort_by_key(|s| s.timestamp_ms());
+                snapshots.truncate(snapshots.len() - retain_last);
+                for snapshot in snapshots {
+                    if snapshot.timestamp_ms() < older_than_ms
+                        && !protected.contains(&snapshot.snapshot_id())
+                    {
+                        to_expire.insert(snapshot.snapshot_id());
+                    }
+                }
+            }
+        }
+
+        let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect();
+        snapshot_ids.sort_unstable();
+        Ok(snapshot_ids)
+    }
+
+    /// Snapshot ids that must never be expired: the current snapshot and 
every branch head and tag
+    /// target. Removing any of these would silently drop a ref in
+    /// 
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots).
+    fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> {
+        let mut ids: HashSet<i64> = metadata.refs.values().map(|r| 
r.snapshot_id).collect();
+        if let Some(current) = metadata.current_snapshot_id() {
+            ids.insert(current);
+        }
+        ids
+    }
+
+    fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error {
+        if metadata.current_snapshot_id() == Some(snapshot_id) {
+            return Error::new(ErrorKind::DataInvalid, "Cannot expire the 
current snapshot");
+        }
+        let refs: Vec<&str> = metadata
+            .refs
+            .iter()
+            .filter(|(_, r)| r.snapshot_id == snapshot_id)
+            .map(|(name, _)| name.as_str())
+            .collect();
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot expire snapshot {snapshot_id}: still referenced by 
{refs:?}"),
+        )
+    }
+}
+
+#[async_trait]
+impl TransactionAction for ExpireSnapshotsAction {
+    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
+        let snapshot_ids = self.snapshot_ids_to_expire(table)?;
+
+        if snapshot_ids.is_empty() {
+            return Ok(ActionCommit::new(vec![], vec![]));
+        }
+
+        Ok(ActionCommit::new(
+            vec![TableUpdate::RemoveSnapshots { snapshot_ids }],
+            vec![TableRequirement::UuidMatch {
+                uuid: table.metadata().uuid(),
+            }],
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use crate::TableUpdate;
+    use crate::transaction::Transaction;
+    use crate::transaction::action::{ApplyTransactionAction, 
TransactionAction};
+    use crate::transaction::expire_snapshots::ExpireSnapshotsAction;
+    use crate::transaction::tests::make_v2_table;
+
+    // `make_v2_table` carries an older snapshot (ts 1515100955770) and a 
current
+    // snapshot (ts 1555100955770).

Review Comment:
   `UuidMatch` alone leaves a race window. If another writer commits a new 
snapshot between `snapshot_ids_to_expire` and `update_table`, that snapshot is 
now live but its parent may be in our to-expire set — removing the parent 
orphans the new branch history, and the UUID check won't catch it because the 
table UUID is unchanged.
   
   `SnapshotProducer` guards against exactly this with a `RefSnapshotIdMatch` 
on `main` (see snapshot.rs ~517). I'd add the same here:
   
   ```rust
   TableRequirement::RefSnapshotIdMatch {
       r#ref: "main".to_string(),
       snapshot_id: table.metadata().current_snapshot_id(),
   },
   ```
   
   Thoughts?



##########
crates/iceberg/src/transaction/expire_snapshots.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::spec::TableMetadata;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// Default number of most recent snapshots to retain when none is specified.
+const DEFAULT_RETAIN_LAST: usize = 1;
+
+/// A transaction action that removes snapshots from table metadata.
+///
+/// This only rewrites metadata; the now-unreferenced data and metadata files 
are left untouched.
+/// Physical file cleanup is the responsibility of a higher-level maintenance 
operation built on
+/// top of this action.
+///
+/// Selection follows Java `RemoveSnapshots`, with a simpler retention model:
+/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and 
age-based expiry
+///   ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a 
snapshot is expired
+///   if it is named explicitly *or* it is older than the cutoff.
+/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even 
when they are older
+///   than the cutoff; it does not protect snapshots named explicitly.
+/// - Snapshots referenced by a branch or tag (including the current snapshot) 
are never expired,
+///   and naming such an id explicitly is an error, since
+///   
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would 
otherwise
+///   drop the ref silently.
+///
+/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, 
per-branch
+/// `min_snapshots_to_keep`) are not yet implemented.
+pub struct ExpireSnapshotsAction {
+    snapshot_ids: Vec<i64>,
+    older_than_ms: Option<i64>,
+    retain_last: Option<usize>,
+}
+
+impl ExpireSnapshotsAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            snapshot_ids: vec![],
+            older_than_ms: None,
+            retain_last: None,
+        }
+    }
+
+    /// Expire these snapshot ids in addition to any age-based selection.
+    ///
+    /// An id that is still referenced by a branch or tag cannot be expired 
and causes
+    /// [`commit`](TransactionAction::commit) to fail.
+    pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self {
+        self.snapshot_ids = snapshot_ids;
+        self
+    }
+
+    /// Expire snapshots whose timestamp is strictly older than 
`older_than_ms`.
+    pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self {
+        self.older_than_ms = Some(older_than_ms);
+        self
+    }
+
+    /// Keep at least the `retain_last` most recent snapshots when expiring by 
age (defaults to 1).
+    ///
+    /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); 
it has no effect on
+    /// its own and does not protect snapshots named via
+    /// [`expire_snapshot_ids`](Self::expire_snapshot_ids).
+    pub fn retain_last(mut self, retain_last: usize) -> Self {
+        self.retain_last = Some(retain_last);
+        self
+    }
+
+    fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> {
+        let metadata = table.metadata();
+        let protected = Self::protected_snapshot_ids(metadata);
+        let existing: HashSet<i64> = metadata.snapshots().map(|s| 
s.snapshot_id()).collect();
+
+        let mut to_expire: HashSet<i64> = HashSet::new();
+
+        // Explicit ids are expired regardless of age, but one still 
referenced by a branch or tag
+        // cannot be expired (Java's RemoveSnapshots errors rather than 
silently dropping the ref).
+        for id in &self.snapshot_ids {
+            if protected.contains(id) {
+                return Err(Self::reference_error(metadata, *id));
+            }
+            if existing.contains(id) {
+                to_expire.insert(*id);
+            }
+        }
+
+        // Age-based expiry, additive with the explicit ids. `retain_last` 
keeps the most recent
+        // snapshots even when older than the cutoff. Without a cutoff there 
is no age expiry.
+        if let Some(older_than_ms) = self.older_than_ms {
+            let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST);
+            let mut snapshots: Vec<_> = 
metadata.snapshots().cloned().collect();
+            if snapshots.len() > retain_last {
+                snapshots.sort_by_key(|s| s.timestamp_ms());
+                snapshots.truncate(snapshots.len() - retain_last);
+                for snapshot in snapshots {
+                    if snapshot.timestamp_ms() < older_than_ms
+                        && !protected.contains(&snapshot.snapshot_id())
+                    {
+                        to_expire.insert(snapshot.snapshot_id());
+                    }
+                }
+            }
+        }
+
+        let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect();
+        snapshot_ids.sort_unstable();
+        Ok(snapshot_ids)

Review Comment:
   I think there's a subtle correctness issue here that's the deepest thing in 
the PR.
   
   We protect only ref *heads* in `protected_snapshot_ids`, but age expiry 
walks a global pool. For two branches sharing a common base, the shared 
ancestor isn't a head, so it gets expired even though it's still reachable via 
`parent_snapshot_id` — afterwards that pointer dangles and tools that walk full 
ancestry (Flink, Trino CDC) see truncated history.
   
   The same root cause bites `retain_last`: Java's 
`computeBranchSnapshotsToRetain` applies it per-branch along each ancestry 
chain, but here it's the N globally-most-recent across all branches. With 
branch A `[A1,A2,A3]` and branch B `[B1,B2,B3]`, `retain_last=2` keeps 
`[B2,B3]` globally and expires A2, which Java would retain.
   
   I'd build the protected set by walking each branch's ancestor chain via 
`parent_snapshot_id`, stopping when count >= `retain_last` and timestamp < 
cutoff, then union that with the head set before age expiry. That's a bigger 
change, but I don't think we can ship age-based expiry without it. wdyt?



##########
crates/iceberg/src/transaction/expire_snapshots.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::spec::TableMetadata;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// Default number of most recent snapshots to retain when none is specified.
+const DEFAULT_RETAIN_LAST: usize = 1;
+
+/// A transaction action that removes snapshots from table metadata.
+///
+/// This only rewrites metadata; the now-unreferenced data and metadata files 
are left untouched.
+/// Physical file cleanup is the responsibility of a higher-level maintenance 
operation built on
+/// top of this action.
+///
+/// Selection follows Java `RemoveSnapshots`, with a simpler retention model:
+/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and 
age-based expiry
+///   ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a 
snapshot is expired
+///   if it is named explicitly *or* it is older than the cutoff.
+/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even 
when they are older
+///   than the cutoff; it does not protect snapshots named explicitly.
+/// - Snapshots referenced by a branch or tag (including the current snapshot) 
are never expired,
+///   and naming such an id explicitly is an error, since
+///   
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would 
otherwise
+///   drop the ref silently.
+///
+/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, 
per-branch
+/// `min_snapshots_to_keep`) are not yet implemented.
+pub struct ExpireSnapshotsAction {
+    snapshot_ids: Vec<i64>,
+    older_than_ms: Option<i64>,
+    retain_last: Option<usize>,
+}
+
+impl ExpireSnapshotsAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            snapshot_ids: vec![],
+            older_than_ms: None,
+            retain_last: None,
+        }
+    }
+
+    /// Expire these snapshot ids in addition to any age-based selection.
+    ///
+    /// An id that is still referenced by a branch or tag cannot be expired 
and causes
+    /// [`commit`](TransactionAction::commit) to fail.
+    pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self {
+        self.snapshot_ids = snapshot_ids;
+        self
+    }
+
+    /// Expire snapshots whose timestamp is strictly older than 
`older_than_ms`.
+    pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self {
+        self.older_than_ms = Some(older_than_ms);
+        self
+    }
+
+    /// Keep at least the `retain_last` most recent snapshots when expiring by 
age (defaults to 1).
+    ///
+    /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); 
it has no effect on
+    /// its own and does not protect snapshots named via
+    /// [`expire_snapshot_ids`](Self::expire_snapshot_ids).
+    pub fn retain_last(mut self, retain_last: usize) -> Self {
+        self.retain_last = Some(retain_last);
+        self
+    }
+
+    fn snapshot_ids_to_expire(&self, table: &Table) -> Result<Vec<i64>> {
+        let metadata = table.metadata();
+        let protected = Self::protected_snapshot_ids(metadata);
+        let existing: HashSet<i64> = metadata.snapshots().map(|s| 
s.snapshot_id()).collect();
+
+        let mut to_expire: HashSet<i64> = HashSet::new();
+
+        // Explicit ids are expired regardless of age, but one still 
referenced by a branch or tag
+        // cannot be expired (Java's RemoveSnapshots errors rather than 
silently dropping the ref).
+        for id in &self.snapshot_ids {
+            if protected.contains(id) {
+                return Err(Self::reference_error(metadata, *id));
+            }
+            if existing.contains(id) {
+                to_expire.insert(*id);
+            }
+        }
+
+        // Age-based expiry, additive with the explicit ids. `retain_last` 
keeps the most recent
+        // snapshots even when older than the cutoff. Without a cutoff there 
is no age expiry.
+        if let Some(older_than_ms) = self.older_than_ms {
+            let retain_last = self.retain_last.unwrap_or(DEFAULT_RETAIN_LAST);
+            let mut snapshots: Vec<_> = 
metadata.snapshots().cloned().collect();
+            if snapshots.len() > retain_last {
+                snapshots.sort_by_key(|s| s.timestamp_ms());
+                snapshots.truncate(snapshots.len() - retain_last);
+                for snapshot in snapshots {
+                    if snapshot.timestamp_ms() < older_than_ms
+                        && !protected.contains(&snapshot.snapshot_id())
+                    {
+                        to_expire.insert(snapshot.snapshot_id());
+                    }
+                }
+            }
+        }
+
+        let mut snapshot_ids: Vec<i64> = to_expire.into_iter().collect();
+        snapshot_ids.sort_unstable();
+        Ok(snapshot_ids)
+    }
+
+    /// Snapshot ids that must never be expired: the current snapshot and 
every branch head and tag
+    /// target. Removing any of these would silently drop a ref in
+    /// 
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots).
+    fn protected_snapshot_ids(metadata: &TableMetadata) -> HashSet<i64> {
+        let mut ids: HashSet<i64> = metadata.refs.values().map(|r| 
r.snapshot_id).collect();
+        if let Some(current) = metadata.current_snapshot_id() {
+            ids.insert(current);
+        }
+        ids
+    }
+
+    fn reference_error(metadata: &TableMetadata, snapshot_id: i64) -> Error {
+        if metadata.current_snapshot_id() == Some(snapshot_id) {
+            return Error::new(ErrorKind::DataInvalid, "Cannot expire the 
current snapshot");
+        }
+        let refs: Vec<&str> = metadata
+            .refs
+            .iter()
+            .filter(|(_, r)| r.snapshot_id == snapshot_id)
+            .map(|(name, _)| name.as_str())
+            .collect();
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot expire snapshot {snapshot_id}: still referenced by 
{refs:?}"),
+        )
+    }
+}
+
+#[async_trait]
+impl TransactionAction for ExpireSnapshotsAction {
+    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
+        let snapshot_ids = self.snapshot_ids_to_expire(table)?;
+
+        if snapshot_ids.is_empty() {
+            return Ok(ActionCommit::new(vec![], vec![]));
+        }
+
+        Ok(ActionCommit::new(
+            vec![TableUpdate::RemoveSnapshots { snapshot_ids }],
+            vec![TableRequirement::UuidMatch {
+                uuid: table.metadata().uuid(),
+            }],
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;

Review Comment:
   We never check `gc.enabled` before expiring. Java's `RemoveSnapshots` 
validates it in the constructor and refuses to run when it's `false`, since 
expiring metadata defeats a user's explicit decision to disable GC.
   
   I'd read it at the top of `commit` (or `snapshot_ids_to_expire`) and bail:
   
   ```rust
   if table.metadata().properties().get("gc.enabled").map(|v| v == 
"false").unwrap_or(false) {
       return Err(Error::new(ErrorKind::DataInvalid,
           "Cannot expire snapshots: gc.enabled is false"));
   }
   ```



##########
crates/iceberg/src/transaction/expire_snapshots.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::spec::TableMetadata;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// Default number of most recent snapshots to retain when none is specified.
+const DEFAULT_RETAIN_LAST: usize = 1;
+
+/// A transaction action that removes snapshots from table metadata.
+///
+/// This only rewrites metadata; the now-unreferenced data and metadata files 
are left untouched.
+/// Physical file cleanup is the responsibility of a higher-level maintenance 
operation built on
+/// top of this action.
+///
+/// Selection follows Java `RemoveSnapshots`, with a simpler retention model:
+/// - Explicit ids ([`expire_snapshot_ids`](Self::expire_snapshot_ids)) and 
age-based expiry
+///   ([`expire_older_than_ms`](Self::expire_older_than_ms)) are combined: a 
snapshot is expired
+///   if it is named explicitly *or* it is older than the cutoff.
+/// - [`retain_last`](Self::retain_last) keeps the most recent snapshots even 
when they are older
+///   than the cutoff; it does not protect snapshots named explicitly.
+/// - Snapshots referenced by a branch or tag (including the current snapshot) 
are never expired,
+///   and naming such an id explicitly is an error, since
+///   
[`remove_snapshots`](crate::spec::TableMetadataBuilder::remove_snapshots) would 
otherwise
+///   drop the ref silently.
+///
+/// Per-ref retention windows (`max_snapshot_age_ms`, `max_ref_age_ms`, 
per-branch
+/// `min_snapshots_to_keep`) are not yet implemented.
+pub struct ExpireSnapshotsAction {
+    snapshot_ids: Vec<i64>,
+    older_than_ms: Option<i64>,
+    retain_last: Option<usize>,
+}
+
+impl ExpireSnapshotsAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            snapshot_ids: vec![],
+            older_than_ms: None,
+            retain_last: None,
+        }
+    }
+
+    /// Expire these snapshot ids in addition to any age-based selection.
+    ///
+    /// An id that is still referenced by a branch or tag cannot be expired 
and causes
+    /// [`commit`](TransactionAction::commit) to fail.
+    pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self {
+        self.snapshot_ids = snapshot_ids;
+        self
+    }
+
+    /// Expire snapshots whose timestamp is strictly older than 
`older_than_ms`.
+    pub fn expire_older_than_ms(mut self, older_than_ms: i64) -> Self {
+        self.older_than_ms = Some(older_than_ms);
+        self
+    }
+
+    /// Keep at least the `retain_last` most recent snapshots when expiring by 
age (defaults to 1).
+    ///
+    /// This only bounds [`expire_older_than_ms`](Self::expire_older_than_ms); 
it has no effect on
+    /// its own and does not protect snapshots named via
+    /// [`expire_snapshot_ids`](Self::expire_snapshot_ids).
+    pub fn retain_last(mut self, retain_last: usize) -> Self {
+        self.retain_last = Some(retain_last);
+        self
+    }

Review Comment:
   This replaces rather than accumulates, so 
`expire_snapshot_ids(vec![A]).expire_snapshot_ids(vec![B])` silently drops A. 
Every other builder (`add_data_files` etc.) uses `.extend()`, so the 
inconsistency is surprising, and since this is public surface, switching to 
accumulate later would be a breaking change.
   
   I'd accumulate and take an iterator to match `add_data_files`:
   
   ```rust
   pub fn expire_snapshot_ids(mut self, snapshot_ids: impl IntoIterator<Item = 
i64>) -> Self {
       self.snapshot_ids.extend(snapshot_ids);
       self
   }
   ```
   
   If replace is intentional, I'd rename to `set_snapshot_ids` and document it 
— but accumulate feels right here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to