This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df830aac9d [core] Changelog expiration skip expired changes to avoid 
FileNotFoundException (#7923)
df830aac9d is described below

commit df830aac9dda6dfa82a43e110afcc70a56250bd9
Author: yuzelin <[email protected]>
AuthorDate: Fri May 22 20:10:49 2026 +0800

    [core] Changelog expiration skip expired changes to avoid 
FileNotFoundException (#7923)
---
 .../apache/paimon/table/ExpireChangelogImpl.java   | 52 +++++++++++++++-------
 .../apache/paimon/table/ChangelogExpireTest.java   | 50 +++++++++++++++++++++
 2 files changed, 87 insertions(+), 15 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index bed5749895..73f9c88b5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -130,10 +130,20 @@ public class ExpireChangelogImpl implements 
ExpireSnapshots {
         // Only clean the snapshot in changelog dir
         maxExclusive = Math.min(maxExclusive, latestChangelogId);
 
+        if (maxExclusive <= earliestChangelogId) {
+            // This happens when retainMin >= total changelog count
+            // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId)
+            return 0;
+        }
+
         for (long id = min; id <= maxExclusive; id++) {
-            if (changelogManager.longLivedChangelogExists(id)
-                    && olderThanMills <= 
changelogManager.longLivedChangelog(id).timeMillis()) {
-                return expireUntil(earliestChangelogId, id);
+            try {
+                if (olderThanMills <= 
changelogManager.tryGetChangelog(id).timeMillis()) {
+                    return expireUntil(earliestChangelogId, id);
+                }
+            } catch (FileNotFoundException e) {
+                // ignore
+                // snapshot may have been deleted by another process
             }
         }
         return expireUntil(earliestChangelogId, maxExclusive);
@@ -141,29 +151,42 @@ public class ExpireChangelogImpl implements 
ExpireSnapshots {
 
     public int expireUntil(long earliestId, long endExclusiveId) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Changelog expire range is [" + earliestId + ", " + 
endExclusiveId + ")");
+            LOG.debug("Changelog expire range is [{}, {})", earliestId, 
endExclusiveId);
         }
 
         List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
 
         List<Snapshot> skippingSnapshots =
                 findSkippingTags(taggedSnapshots, earliestId, endExclusiveId);
-        skippingSnapshots.add(changelogManager.changelog(endExclusiveId));
+        try {
+            
skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId));
+        } catch (FileNotFoundException e) {
+            LOG.error(
+                    "The endExclusive changelog #{} not found, skip 
expiration. Maybe you should use expire_changelogs to delete the separated 
changelogs.",
+                    endExclusiveId,
+                    e);
+            return 0;
+        }
         skippingSnapshots.add(snapshotManager.earliestSnapshot());
         Set<String> manifestSkippSet = 
changelogDeletion.manifestSkippingSet(skippingSnapshots);
         for (long id = earliestId; id < endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Ready to delete changelog files from changelog #" + 
id);
+                LOG.debug("Ready to delete changelog files from changelog 
#{}", id);
+            }
+            Changelog changelog;
+            try {
+                changelog = changelogManager.tryGetChangelog(id);
+            } catch (FileNotFoundException e) {
+                LOG.info("Changelog #{} not found, skip it.", id, e);
+                continue;
             }
-            Changelog changelog = changelogManager.longLivedChangelog(id);
             Predicate<ExpireFileEntry> skipper;
             try {
                 skipper = 
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
             } catch (Exception e) {
                 LOG.info(
-                        String.format(
-                                "Skip cleaning data files of changelog '%s' 
due to failed to build skipping set.",
-                                id),
+                        "Skip cleaning data files of changelog #{} due to 
failed to build skipping set.",
+                        id,
                         e);
                 continue;
             }
@@ -219,13 +242,13 @@ public class ExpireChangelogImpl implements 
ExpireSnapshots {
         Set<String> manifestSkippSet = 
changelogDeletion.manifestSkippingSet(skippingSnapshots);
         for (long id = earliestChangelogId; id <= latestChangelogId; id++) {
 
-            LOG.info("Ready to delete changelog files from changelog #" + id);
+            LOG.info("Ready to delete changelog files from changelog #{}", id);
 
             Changelog changelog;
             try {
                 changelog = changelogManager.tryGetChangelog(id);
             } catch (FileNotFoundException e) {
-                LOG.info("fail to get changelog #" + id);
+                LOG.info("fail to get changelog #{}", id);
                 continue;
             }
             Predicate<ExpireFileEntry> skipper;
@@ -233,9 +256,8 @@ public class ExpireChangelogImpl implements ExpireSnapshots 
{
                 skipper = 
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
             } catch (Exception e) {
                 LOG.info(
-                        String.format(
-                                "Skip cleaning data files of changelog '%s' 
due to failed to build skipping set.",
-                                id),
+                        "Skip cleaning data files of changelog '{}' due to 
failed to build skipping set.",
+                        id,
                         e);
                 continue;
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
index 1821d927c4..281dd0f1a6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
@@ -41,6 +42,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -143,4 +145,52 @@ public class ChangelogExpireTest extends 
IndexFileExpireTableTest {
             
assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue();
         }
     }
+
+    @Test
+    public void testExpireWithMiddleChangelogNotFound() throws Exception {
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = writeBuilder.newWrite();
+        StreamTableCommit commit = writeBuilder.newCommit();
+        for (int i = 1; i <= 10; i++) {
+            write(write, createRow(1, 0, i, i * 10));
+            commit.commit(i, write.prepareCommit(true, i));
+        }
+        write.close();
+        commit.close();
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        // changelogRetainMax > snapshotRetainMax to ensure 
changelogDecoupled=true
+        ExpireConfig expireConfig =
+                ExpireConfig.builder()
+                        .changelogRetainMax((int) latestSnapshotId)
+                        .changelogRetainMin(1)
+                        .changelogTimeRetain(Duration.ofMillis(0))
+                        .snapshotRetainMax(1)
+                        .snapshotRetainMin(1)
+                        .build();
+        ExpireSnapshotsImpl expireSnapshots =
+                (ExpireSnapshotsImpl) 
table.newExpireSnapshots().config(expireConfig);
+        expireSnapshots.expire();
+
+        ChangelogManager changelogManager = table.changelogManager();
+        FileIO fileIO = table.fileIO();
+        long latestChangelogId = changelogManager.latestLongLivedChangelogId();
+        long earliestChangelogId = 
changelogManager.earliestLongLivedChangelogId();
+
+        // Delete a middle changelog to simulate concurrent deletion
+        long middleId = (earliestChangelogId + latestChangelogId) / 2;
+        
assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(middleId))).isTrue();
+        
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(middleId));
+
+        ExpireChangelogImpl expire =
+                (ExpireChangelogImpl) 
table.newExpireChangelog().config(expireConfig);
+
+        // should not throw even though a middle changelog is missing
+        assertThatCode(expire::expire).doesNotThrowAnyException();
+
+        // earliest should be advanced past the deleted range
+        
assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(latestChangelogId);
+    }
 }

Reply via email to