This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new df830aac9d [core] Changelog expiration skip expired changes to avoid
FileNotFoundException (#7923)
df830aac9d is described below
commit df830aac9dda6dfa82a43e110afcc70a56250bd9
Author: yuzelin <[email protected]>
AuthorDate: Fri May 22 20:10:49 2026 +0800
[core] Changelog expiration skip expired changes to avoid
FileNotFoundException (#7923)
---
.../apache/paimon/table/ExpireChangelogImpl.java | 52 +++++++++++++++-------
.../apache/paimon/table/ChangelogExpireTest.java | 50 +++++++++++++++++++++
2 files changed, 87 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index bed5749895..73f9c88b5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -130,10 +130,20 @@ public class ExpireChangelogImpl implements
ExpireSnapshots {
// Only clean the snapshot in changelog dir
maxExclusive = Math.min(maxExclusive, latestChangelogId);
+ if (maxExclusive <= earliestChangelogId) {
+ // This happens when retainMin >= total changelog count
+ // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId)
+ return 0;
+ }
+
for (long id = min; id <= maxExclusive; id++) {
- if (changelogManager.longLivedChangelogExists(id)
- && olderThanMills <=
changelogManager.longLivedChangelog(id).timeMillis()) {
- return expireUntil(earliestChangelogId, id);
+ try {
+ if (olderThanMills <=
changelogManager.tryGetChangelog(id).timeMillis()) {
+ return expireUntil(earliestChangelogId, id);
+ }
+ } catch (FileNotFoundException e) {
+ // ignore
+ // snapshot may have been deleted by another process
}
}
return expireUntil(earliestChangelogId, maxExclusive);
@@ -141,29 +151,42 @@ public class ExpireChangelogImpl implements
ExpireSnapshots {
public int expireUntil(long earliestId, long endExclusiveId) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Changelog expire range is [" + earliestId + ", " +
endExclusiveId + ")");
+ LOG.debug("Changelog expire range is [{}, {})", earliestId,
endExclusiveId);
}
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> skippingSnapshots =
findSkippingTags(taggedSnapshots, earliestId, endExclusiveId);
- skippingSnapshots.add(changelogManager.changelog(endExclusiveId));
+ try {
+
skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId));
+ } catch (FileNotFoundException e) {
+ LOG.error(
+ "The endExclusive changelog #{} not found, skip
expiration. Maybe you should use expire_changelogs to delete the separated
changelogs.",
+ endExclusiveId,
+ e);
+ return 0;
+ }
skippingSnapshots.add(snapshotManager.earliestSnapshot());
Set<String> manifestSkippSet =
changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to delete changelog files from changelog #" +
id);
+ LOG.debug("Ready to delete changelog files from changelog
#{}", id);
+ }
+ Changelog changelog;
+ try {
+ changelog = changelogManager.tryGetChangelog(id);
+ } catch (FileNotFoundException e) {
+ LOG.info("Changelog #{} not found, skip it.", id, e);
+ continue;
}
- Changelog changelog = changelogManager.longLivedChangelog(id);
Predicate<ExpireFileEntry> skipper;
try {
skipper =
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
- String.format(
- "Skip cleaning data files of changelog '%s'
due to failed to build skipping set.",
- id),
+ "Skip cleaning data files of changelog #{} due to
failed to build skipping set.",
+ id,
e);
continue;
}
@@ -219,13 +242,13 @@ public class ExpireChangelogImpl implements
ExpireSnapshots {
Set<String> manifestSkippSet =
changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestChangelogId; id <= latestChangelogId; id++) {
- LOG.info("Ready to delete changelog files from changelog #" + id);
+ LOG.info("Ready to delete changelog files from changelog #{}", id);
Changelog changelog;
try {
changelog = changelogManager.tryGetChangelog(id);
} catch (FileNotFoundException e) {
- LOG.info("fail to get changelog #" + id);
+ LOG.info("fail to get changelog #{}", id);
continue;
}
Predicate<ExpireFileEntry> skipper;
@@ -233,9 +256,8 @@ public class ExpireChangelogImpl implements ExpireSnapshots
{
skipper =
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
- String.format(
- "Skip cleaning data files of changelog '%s'
due to failed to build skipping set.",
- id),
+ "Skip cleaning data files of changelog '{}' due to
failed to build skipping set.",
+ id,
e);
continue;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
index 1821d927c4..281dd0f1a6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
@@ -41,6 +42,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -143,4 +145,52 @@ public class ChangelogExpireTest extends
IndexFileExpireTableTest {
assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue();
}
}
+
+ @Test
+ public void testExpireWithMiddleChangelogNotFound() throws Exception {
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = writeBuilder.newWrite();
+ StreamTableCommit commit = writeBuilder.newCommit();
+ for (int i = 1; i <= 10; i++) {
+ write(write, createRow(1, 0, i, i * 10));
+ commit.commit(i, write.prepareCommit(true, i));
+ }
+ write.close();
+ commit.close();
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+ long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+ // changelogRetainMax > snapshotRetainMax to ensure
changelogDecoupled=true
+ ExpireConfig expireConfig =
+ ExpireConfig.builder()
+ .changelogRetainMax((int) latestSnapshotId)
+ .changelogRetainMin(1)
+ .changelogTimeRetain(Duration.ofMillis(0))
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .build();
+ ExpireSnapshotsImpl expireSnapshots =
+ (ExpireSnapshotsImpl)
table.newExpireSnapshots().config(expireConfig);
+ expireSnapshots.expire();
+
+ ChangelogManager changelogManager = table.changelogManager();
+ FileIO fileIO = table.fileIO();
+ long latestChangelogId = changelogManager.latestLongLivedChangelogId();
+ long earliestChangelogId =
changelogManager.earliestLongLivedChangelogId();
+
+ // Delete a middle changelog to simulate concurrent deletion
+ long middleId = (earliestChangelogId + latestChangelogId) / 2;
+
assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(middleId))).isTrue();
+
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(middleId));
+
+ ExpireChangelogImpl expire =
+ (ExpireChangelogImpl)
table.newExpireChangelog().config(expireConfig);
+
+ // should not throw even though a middle changelog is missing
+ assertThatCode(expire::expire).doesNotThrowAnyException();
+
+ // earliest should be advanced past the deleted range
+
assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(latestChangelogId);
+ }
}