This is an automated email from the ASF dual-hosted git repository.
xxubai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new d8f379346 [Hotfix] Force ReachableFileCleanup when snapshots exist
outside main ancestry (#4231)
d8f379346 is described below
commit d8f379346fcc8eb899808fbf2d41d39009b6b4ac
Author: Jiwon Park <[email protected]>
AuthorDate: Thu May 28 14:56:34 2026 +0900
[Hotfix] Force ReachableFileCleanup when snapshots exist outside main
ancestry (#4231)
[optimizer] Force reachable cleanup when snapshots exist outside main
ancestry
Iceberg's auto-selected IncrementalFileCleanup can silently truncate its
ancestor walk when a parent snapshot is missing, deleting data files the
current snapshot still references. Force the safe ReachableFileCleanup only
when snapshots exist outside the main ancestry; healthy tables are
unchanged.
Signed-off-by: Jiwon Park <[email protected]>
---
.../TestExpireSnapshotsKeepReferencedFiles.java | 136 +++++++++++++++++++++
.../iceberg/maintainer/IcebergTableMaintainer.java | 51 ++++++--
.../apache/iceberg/ReachableFileCleanupBridge.java | 52 ++++++++
3 files changed, 231 insertions(+), 8 deletions(-)
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
new file mode 100644
index 000000000..9a93962e9
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.data.Record;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Reproduces a data-file loss observed in production during snapshot
expiration.
+ *
+ * <p>When a table has a single ref (the common case after a tag is dropped),
{@code
+ * RemoveSnapshots} auto-selects {@link
org.apache.iceberg.IncrementalFileCleanup}. That strategy
+ * walks the current snapshot's ancestor chain via {@code
SnapshotUtil.ancestorIds}; if a parent
+ * snapshot is missing from metadata (e.g. expired by an earlier cycle) the
walk terminates
+ * silently. Snapshots below that break are then treated as "not an ancestor",
so the ADDED entries
+ * in their (superseded but still referenced) manifests are reverted and the
data files are
+ * physically deleted - even though those files are still carried over as
EXISTING entries in the
+ * current snapshot's manifest. The result is a current snapshot that
references a missing file.
+ *
+ * <p>This test builds exactly that state and asserts the invariant that
expiration must never
+ * delete a data file referenced by the current snapshot. It fails on the
buggy incremental path and
+ * passes once {@link IcebergTableMaintainer} detects the off-main snapshot
and forces the reachable
+ * cleanup strategy, which never walks the ancestor chain.
+ */
+@RunWith(Parameterized.class)
+public class TestExpireSnapshotsKeepReferencedFiles extends ExecutorTestBase {
+
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {new BasicCatalogTestHelper(TableFormat.ICEBERG), new
BasicTableTestHelper(false, false)}
+ };
+ }
+
+ public TestExpireSnapshotsKeepReferencedFiles(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ @Test
+ public void testExpireKeepsFilesReferencedByCurrentSnapshot() {
+ UnkeyedTable table = getMixedTable().asUnkeyedTable();
+ // Merge manifests explicitly below; keep auto-merge off so the fixture is
deterministic.
+ table
+ .updateProperties()
+ .set("commit.manifest-merge.enabled", "false")
+ .set(TableProperties.SNAPSHOT_KEEP_DURATION, "0")
+ .commit();
+
+ // S0, S1: each append writes one data file in its own manifest (F0 in m0,
F1 in m1).
+ DataFile f0 = appendOneRecord(table, 1, 1L);
+ DataFile f1 = appendOneRecord(table, 2, 2L);
+
+ // S2: rewrite manifests into a single merged manifest holding F0, F1 as
EXISTING entries.
+ // m0/m1 are now superseded but remain referenced by the manifest lists of
S0/S1.
+ table.rewriteManifests().clusterBy(file -> 0).commit();
+ long midSnapshotId = table.currentSnapshot().snapshotId();
+
+ // S3 (head): append F2. The current snapshot still references F0, F1 via
the merged manifest.
+ DataFile f2 = appendOneRecord(table, 3, 3L);
+
+ Assert.assertTrue(table.io().exists(f0.path().toString()));
+ Assert.assertTrue(table.io().exists(f1.path().toString()));
+
+ // Explicitly expire the middle snapshot S2. This breaks head's ancestor
chain
+ // (head.parent == S2, now absent), so a later ancestor walk truncates
above S0/S1.
+ // Snapshot-id expiration uses reachable cleanup, so the baseline files
stay safe here.
+
table.expireSnapshots().expireSnapshotId(midSnapshotId).cleanExpiredFiles(true).commit();
+ Assert.assertTrue(table.io().exists(f0.path().toString()));
+ Assert.assertTrue(table.io().exists(f1.path().toString()));
+
+ // Only the main ref remains, which is what makes iceberg auto-select
incremental cleanup.
+ Assert.assertEquals(1, table.refs().size());
+
+ IcebergTableMaintainer maintainer =
+ new IcebergTableMaintainer(table, table.id(),
TestTableMaintainerContext.of(table));
+ maintainer.expireSnapshots(System.currentTimeMillis(), 1);
+
+ // Sanity: head retained, history expired.
+ Assert.assertEquals(1, Iterables.size(table.snapshots()));
+ Assert.assertTrue(table.io().exists(f2.path().toString()));
+
+ // Invariant: F0 and F1 are still referenced by the current snapshot, so
they must survive.
+ Assert.assertTrue(
+ "F0 is referenced by the current snapshot and must not be deleted by
expiration",
+ table.io().exists(f0.path().toString()));
+ Assert.assertTrue(
+ "F1 is referenced by the current snapshot and must not be deleted by
expiration",
+ table.io().exists(f1.path().toString()));
+ }
+
+ private DataFile appendOneRecord(UnkeyedTable table, int id, long txId) {
+ Record record = tableTestHelper().generateTestRecord(id, "name" + id, 0,
"2022-01-01T00:00:00");
+ List<DataFile> dataFiles =
+ tableTestHelper().writeBaseStore(table, txId,
Lists.newArrayList(record), false);
+ Assert.assertEquals(1, dataFiles.size());
+ AppendFiles appendFiles = table.newAppend();
+ dataFiles.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+ return dataFiles.get(0);
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index f906d4b86..f121d1fe6 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -46,10 +46,12 @@ import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileCleanupBridge;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
@@ -70,6 +72,7 @@ import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;
+import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,14 +204,24 @@ public class IcebergTableMaintainer implements
TableMaintainer {
minCount,
exclude);
RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(),
exclude);
- table
- .expireSnapshots()
- .retainLast(Math.max(minCount, 1))
- .expireOlderThan(olderThan)
- .deleteWith(expiredFileCleaner::addFile)
- .cleanExpiredFiles(
- true) /* enable clean only for collecting the expired files, will
delete them later */
- .commit();
+ ExpireSnapshots expireSnapshots =
+ table
+ .expireSnapshots()
+ .retainLast(Math.max(minCount, 1))
+ .expireOlderThan(olderThan)
+ .deleteWith(expiredFileCleaner::addFile)
+ .cleanExpiredFiles(
+ true) /* enable clean only for collecting the expired files,
will delete them later */;
+ // iceberg auto-selects IncrementalFileCleanup for single-ref tables. That
strategy walks the
+ // current snapshot's ancestor chain and can terminate silently at a
missing parent, then revert
+ // the ADDED entries of superseded-but-still-referenced manifests below
the break - physically
+ // deleting data files the current snapshot still references (observed as
partition data-file
+ // loss in production). The walk only truncates when a snapshot sits
outside the current main
+ // ancestry, so force the safe ReachableFileCleanup then.
+ if (hasSnapshotsOutsideMainAncestry()) {
+ ReachableFileCleanupBridge.forceReachable(expireSnapshots);
+ }
+ expireSnapshots.commit();
int collectedFiles = expiredFileCleaner.fileCount();
expiredFileCleaner.clear();
@@ -227,6 +240,28 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
+ /**
+ * Whether the table holds any snapshot that is not reachable from the
current snapshot's ancestor
+ * chain. Such a snapshot makes {@code IncrementalFileCleanup}'s ancestor
walk truncate, which can
+ * lead it to delete data files still referenced by the current snapshot.
The walk used here is
+ * the same {@link SnapshotUtil#ancestorIds} used by the cleanup, so it
detects exactly the states
+ * the incremental strategy would mishandle.
+ */
+ private boolean hasSnapshotsOutsideMainAncestry() {
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot == null) {
+ return false;
+ }
+ Set<Long> mainAncestors =
+ Sets.newHashSet(SnapshotUtil.ancestorIds(currentSnapshot,
table::snapshot));
+ for (Snapshot snapshot : table.snapshots()) {
+ if (!mainAncestors.contains(snapshot.snapshotId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void expireData() {
DataExpirationConfig expirationConfig =
context.getTableConfiguration().getExpiringDataConfig();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
new file mode 100644
index 000000000..fbc5eabcc
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Bridge into iceberg-core to force the reachable file-cleanup strategy
during snapshot expiration.
+ *
+ * <p>{@link RemoveSnapshots#withIncrementalCleanup(boolean)} is
package-private, so this class
+ * lives in {@code org.apache.iceberg} to call it without reflection. Binding
at compile time means
+ * a future change to that method's signature breaks the build instead of
silently falling back.
+ *
+ * <p>By default iceberg auto-selects {@link IncrementalFileCleanup} whenever
the table has a single
+ * ref. That strategy walks the current snapshot's ancestor chain and
terminates silently if a
+ * parent snapshot is missing from metadata; snapshots below the break are
then treated as
+ * non-ancestors and the ADDED entries in their
superseded-but-still-referenced manifests are
+ * reverted, physically deleting data files the current snapshot still
references. Forcing {@link
+ * ReachableFileCleanup} avoids the ancestor walk entirely and is safe for any
ref count.
+ */
+public final class ReachableFileCleanupBridge {
+
+ private ReachableFileCleanupBridge() {}
+
+ /**
+ * Forces {@code expireSnapshots} to use {@link ReachableFileCleanup}
instead of letting iceberg
+ * auto-select the incremental strategy.
+ *
+ * @param expireSnapshots the expire operation to configure
+ * @return the same operation, for fluent chaining
+ */
+ public static ExpireSnapshots forceReachable(ExpireSnapshots
expireSnapshots) {
+ if (expireSnapshots instanceof RemoveSnapshots) {
+ ((RemoveSnapshots) expireSnapshots).withIncrementalCleanup(false);
+ }
+ return expireSnapshots;
+ }
+}