This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a3cb728a [core] Avoid FileNotFoundException when looking for the
latest snapshot of user (#3906)
0a3cb728a is described below
commit 0a3cb728a2c1f311e7a84bbb1181ca28b7242e1b
Author: rfyu <[email protected]>
AuthorDate: Sun Aug 11 23:24:58 2024 +0800
[core] Avoid FileNotFoundException when looking for the latest snapshot of
user (#3906)
---
.../org/apache/paimon/utils/SnapshotManager.java | 33 ++++++++++++++-
.../apache/paimon/utils/SnapshotManagerTest.java | 49 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 1b64d8bc8..c5fdb042e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -24,6 +24,9 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -56,6 +59,8 @@ public class SnapshotManager implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotManager.class);
+
private static final String SNAPSHOT_PREFIX = "snapshot-";
private static final String CHANGELOG_PREFIX = "changelog-";
public static final String EARLIEST = "EARLIEST";
@@ -523,7 +528,31 @@ public class SnapshotManager implements Serializable {
"Latest snapshot id is not null, but earliest snapshot
id is null. "
+ "This is unexpected.");
for (long id = latestId; id >= earliestId; id--) {
- Snapshot snapshot = snapshot(id);
+ Snapshot snapshot;
+ try {
+ snapshot = snapshot(id);
+ } catch (Exception e) {
+ long newEarliestId =
+ Preconditions.checkNotNull(
+ earliestSnapshotId(),
+ "Latest snapshot id is not null, but earliest
snapshot id is null. "
+ + "This is unexpected.");
+
+ // this is a valid snapshot, should throw exception
+ if (id >= newEarliestId) {
+ throw e;
+ }
+
+ // this is an expired snapshot
+ LOG.warn(
+ "Snapshot #"
+ + id
+ + " is expired. The latest snapshot of current
user("
+ + user
+ + ") is not found.");
+ break;
+ }
+
if (user.equals(snapshot.commitUser())) {
return Optional.of(snapshot);
}
@@ -594,7 +623,7 @@ public class SnapshotManager implements Serializable {
return null;
}
- // this is a valid snapshot, should not throw exception
+ // this is a valid snapshot, should throw exception
if (id >= newEarliestId) {
throw e;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index b1e038480..4e0b18d47 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -199,6 +199,55 @@ public class SnapshotManagerTest {
null));
}
+ @Test
+ public void testLatestSnapshotOfUser() throws IOException,
InterruptedException {
+ FileIO localFileIO = LocalFileIO.create();
+ SnapshotManager snapshotManager =
+ new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ // create 100 snapshots using user "lastCommitUser"
+ for (long i = 0; i < 100; i++) {
+ Snapshot snapshot =
+ new Snapshot(
+ i,
+ 0L,
+ null,
+ null,
+ null,
+ null,
+ "lastCommitUser",
+ 0L,
+ Snapshot.CommitKind.APPEND,
+ i * 1000,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
+ }
+
+ // read the latest snapshot of user "currentCommitUser"
+ AtomicReference<Exception> exception = new AtomicReference<>();
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+
snapshotManager.latestSnapshotOfUser("currentCommitUser");
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+ thread.start();
+ Thread.sleep(100);
+
+ // expire snapshot
+ localFileIO.deleteQuietly(snapshotManager.snapshotPath(0));
+ thread.join();
+
+ assertThat(exception.get()).isNull();
+ }
+
@Test
public void testTraversalSnapshotsFromLatestSafely() throws IOException,
InterruptedException {
FileIO localFileIO = LocalFileIO.create();