This is an automated email from the ASF dual-hosted git repository.

xxubai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f379346 [Hotfix] Force ReachableFileCleanup when snapshots exist 
outside main ancestry (#4231)
d8f379346 is described below

commit d8f379346fcc8eb899808fbf2d41d39009b6b4ac
Author: Jiwon Park <[email protected]>
AuthorDate: Thu May 28 14:56:34 2026 +0900

    [Hotfix] Force ReachableFileCleanup when snapshots exist outside main 
ancestry (#4231)
    
    [optimizer] Force reachable cleanup when snapshots exist outside main 
ancestry
    
    Iceberg's auto-selected IncrementalFileCleanup can silently truncate its
    ancestor walk when a parent snapshot is missing, deleting data files the
    current snapshot still references. Force the safe ReachableFileCleanup only
    when snapshots exist outside the main ancestry; healthy tables are 
unchanged.
    
    Signed-off-by: Jiwon Park <[email protected]>
---
 .../TestExpireSnapshotsKeepReferencedFiles.java    | 136 +++++++++++++++++++++
 .../iceberg/maintainer/IcebergTableMaintainer.java |  51 ++++++--
 .../apache/iceberg/ReachableFileCleanupBridge.java |  52 ++++++++
 3 files changed, 231 insertions(+), 8 deletions(-)

diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
new file mode 100644
index 000000000..9a93962e9
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.data.Record;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Reproduces a data-file loss observed in production during snapshot 
expiration.
+ *
+ * <p>When a table has a single ref (the common case after a tag is dropped), 
{@code
+ * RemoveSnapshots} auto-selects {@link 
org.apache.iceberg.IncrementalFileCleanup}. That strategy
+ * walks the current snapshot's ancestor chain via {@code 
SnapshotUtil.ancestorIds}; if a parent
+ * snapshot is missing from metadata (e.g. expired by an earlier cycle) the 
walk terminates
+ * silently. Snapshots below that break are then treated as "not an ancestor", 
so the ADDED entries
+ * in their (superseded but still referenced) manifests are reverted and the 
data files are
+ * physically deleted - even though those files are still carried over as 
EXISTING entries in the
+ * current snapshot's manifest. The result is a current snapshot that 
references a missing file.
+ *
+ * <p>This test builds exactly that state and asserts the invariant that 
expiration must never
+ * delete a data file referenced by the current snapshot. It fails on the 
buggy incremental path and
+ * passes once {@link IcebergTableMaintainer} detects the off-main snapshot 
and forces the reachable
+ * cleanup strategy, which never walks the ancestor chain.
+ */
+@RunWith(Parameterized.class)
+public class TestExpireSnapshotsKeepReferencedFiles extends ExecutorTestBase {
+
+  @Parameterized.Parameters(name = "{0}, {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      {new BasicCatalogTestHelper(TableFormat.ICEBERG), new 
BasicTableTestHelper(false, false)}
+    };
+  }
+
+  public TestExpireSnapshotsKeepReferencedFiles(
+      CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+    super(catalogTestHelper, tableTestHelper);
+  }
+
+  @Test
+  public void testExpireKeepsFilesReferencedByCurrentSnapshot() {
+    UnkeyedTable table = getMixedTable().asUnkeyedTable();
+    // Merge manifests explicitly below; keep auto-merge off so the fixture is 
deterministic.
+    table
+        .updateProperties()
+        .set("commit.manifest-merge.enabled", "false")
+        .set(TableProperties.SNAPSHOT_KEEP_DURATION, "0")
+        .commit();
+
+    // S0, S1: each append writes one data file in its own manifest (F0 in m0, 
F1 in m1).
+    DataFile f0 = appendOneRecord(table, 1, 1L);
+    DataFile f1 = appendOneRecord(table, 2, 2L);
+
+    // S2: rewrite manifests into a single merged manifest holding F0, F1 as 
EXISTING entries.
+    // m0/m1 are now superseded but remain referenced by the manifest lists of 
S0/S1.
+    table.rewriteManifests().clusterBy(file -> 0).commit();
+    long midSnapshotId = table.currentSnapshot().snapshotId();
+
+    // S3 (head): append F2. The current snapshot still references F0, F1 via 
the merged manifest.
+    DataFile f2 = appendOneRecord(table, 3, 3L);
+
+    Assert.assertTrue(table.io().exists(f0.path().toString()));
+    Assert.assertTrue(table.io().exists(f1.path().toString()));
+
+    // Explicitly expire the middle snapshot S2. This breaks head's ancestor 
chain
+    // (head.parent == S2, now absent), so a later ancestor walk truncates 
above S0/S1.
+    // Snapshot-id expiration uses reachable cleanup, so the baseline files 
stay safe here.
+    
table.expireSnapshots().expireSnapshotId(midSnapshotId).cleanExpiredFiles(true).commit();
+    Assert.assertTrue(table.io().exists(f0.path().toString()));
+    Assert.assertTrue(table.io().exists(f1.path().toString()));
+
+    // Only the main ref remains, which is what makes iceberg auto-select 
incremental cleanup.
+    Assert.assertEquals(1, table.refs().size());
+
+    IcebergTableMaintainer maintainer =
+        new IcebergTableMaintainer(table, table.id(), 
TestTableMaintainerContext.of(table));
+    maintainer.expireSnapshots(System.currentTimeMillis(), 1);
+
+    // Sanity: head retained, history expired.
+    Assert.assertEquals(1, Iterables.size(table.snapshots()));
+    Assert.assertTrue(table.io().exists(f2.path().toString()));
+
+    // Invariant: F0 and F1 are still referenced by the current snapshot, so 
they must survive.
+    Assert.assertTrue(
+        "F0 is referenced by the current snapshot and must not be deleted by 
expiration",
+        table.io().exists(f0.path().toString()));
+    Assert.assertTrue(
+        "F1 is referenced by the current snapshot and must not be deleted by 
expiration",
+        table.io().exists(f1.path().toString()));
+  }
+
+  private DataFile appendOneRecord(UnkeyedTable table, int id, long txId) {
+    Record record = tableTestHelper().generateTestRecord(id, "name" + id, 0, 
"2022-01-01T00:00:00");
+    List<DataFile> dataFiles =
+        tableTestHelper().writeBaseStore(table, txId, 
Lists.newArrayList(record), false);
+    Assert.assertEquals(1, dataFiles.size());
+    AppendFiles appendFiles = table.newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+    return dataFiles.get(0);
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index f906d4b86..f121d1fe6 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -46,10 +46,12 @@ import org.apache.iceberg.ContentScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileCleanupBridge;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Schema;
@@ -70,6 +72,7 @@ import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.SerializableFunction;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -201,14 +204,24 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         minCount,
         exclude);
     RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), 
exclude);
-    table
-        .expireSnapshots()
-        .retainLast(Math.max(minCount, 1))
-        .expireOlderThan(olderThan)
-        .deleteWith(expiredFileCleaner::addFile)
-        .cleanExpiredFiles(
-            true) /* enable clean only for collecting the expired files, will 
delete them later */
-        .commit();
+    ExpireSnapshots expireSnapshots =
+        table
+            .expireSnapshots()
+            .retainLast(Math.max(minCount, 1))
+            .expireOlderThan(olderThan)
+            .deleteWith(expiredFileCleaner::addFile)
+            .cleanExpiredFiles(
+                true) /* enable clean only for collecting the expired files, 
will delete them later */;
+    // iceberg auto-selects IncrementalFileCleanup for single-ref tables. That 
strategy walks the
+    // current snapshot's ancestor chain and can terminate silently at a 
missing parent, then revert
+    // the ADDED entries of superseded-but-still-referenced manifests below 
the break - physically
+    // deleting data files the current snapshot still references (observed as 
partition data-file
+    // loss in production). The walk only truncates when a snapshot sits 
outside the current main
+    // ancestry, so force the safe ReachableFileCleanup then.
+    if (hasSnapshotsOutsideMainAncestry()) {
+      ReachableFileCleanupBridge.forceReachable(expireSnapshots);
+    }
+    expireSnapshots.commit();
 
     int collectedFiles = expiredFileCleaner.fileCount();
     expiredFileCleaner.clear();
@@ -227,6 +240,28 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
+  /**
+   * Whether the table holds any snapshot that is not reachable from the 
current snapshot's ancestor
+   * chain. Such a snapshot makes {@code IncrementalFileCleanup}'s ancestor 
walk truncate, which can
+   * lead it to delete data files still referenced by the current snapshot. 
The walk used here is
+   * the same {@link SnapshotUtil#ancestorIds} used by the cleanup, so it 
detects exactly the states
+   * the incremental strategy would mishandle.
+   */
+  private boolean hasSnapshotsOutsideMainAncestry() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot == null) {
+      return false;
+    }
+    Set<Long> mainAncestors =
+        Sets.newHashSet(SnapshotUtil.ancestorIds(currentSnapshot, 
table::snapshot));
+    for (Snapshot snapshot : table.snapshots()) {
+      if (!mainAncestors.contains(snapshot.snapshotId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void expireData() {
     DataExpirationConfig expirationConfig = 
context.getTableConfiguration().getExpiringDataConfig();
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
new file mode 100644
index 000000000..fbc5eabcc
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Bridge into iceberg-core to force the reachable file-cleanup strategy 
during snapshot expiration.
+ *
+ * <p>{@link RemoveSnapshots#withIncrementalCleanup(boolean)} is 
package-private, so this class
+ * lives in {@code org.apache.iceberg} to call it without reflection. Binding 
at compile time means
+ * a future change to that method's signature breaks the build instead of 
silently falling back.
+ *
+ * <p>By default iceberg auto-selects {@link IncrementalFileCleanup} whenever 
the table has a single
+ * ref. That strategy walks the current snapshot's ancestor chain and 
terminates silently if a
+ * parent snapshot is missing from metadata; snapshots below the break are 
then treated as
+ * non-ancestors and the ADDED entries in their 
superseded-but-still-referenced manifests are
+ * reverted, physically deleting data files the current snapshot still 
references. Forcing {@link
+ * ReachableFileCleanup} avoids the ancestor walk entirely and is safe for any 
ref count.
+ */
+public final class ReachableFileCleanupBridge {
+
+  private ReachableFileCleanupBridge() {}
+
+  /**
+   * Forces {@code expireSnapshots} to use {@link ReachableFileCleanup} 
instead of letting iceberg
+   * auto-select the incremental strategy.
+   *
+   * @param expireSnapshots the expire operation to configure
+   * @return the same operation, for fluent chaining
+   */
+  public static ExpireSnapshots forceReachable(ExpireSnapshots 
expireSnapshots) {
+    if (expireSnapshots instanceof RemoveSnapshots) {
+      ((RemoveSnapshots) expireSnapshots).withIncrementalCleanup(false);
+    }
+    return expireSnapshots;
+  }
+}

Reply via email to