This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2504732549 [core] Fix race condition for earliest snapshot (#4930)
2504732549 is described below
commit 2504732549660e7ed7948f88aa255bc4f7710ebb
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Feb 12 22:26:45 2025 +0800
[core] Fix race condition for earliest snapshot (#4930)
---
.../src/main/java/org/apache/paimon/Changelog.java | 11 ++
.../org/apache/paimon/utils/SnapshotManager.java | 103 +++++++++++----
.../apache/paimon/utils/SnapshotManagerTest.java | 145 +++++++++++++++++----
3 files changed, 212 insertions(+), 47 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index 8c6295b44c..79c65ba570 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -28,6 +28,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonPro
import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
@@ -105,9 +106,19 @@ public class Changelog extends Snapshot {
}
public static Changelog fromPath(FileIO fileIO, Path path) {
+ try {
+ return tryFromPath(fileIO, path);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("Fails to read changelog from path " +
path, e);
+ }
+ }
+
+ public static Changelog tryFromPath(FileIO fileIO, Path path) throws
FileNotFoundException {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " +
path, e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5257cf1c12..8eee46d9a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -168,6 +168,11 @@ public class SnapshotManager implements Serializable {
return Changelog.fromPath(fileIO, changelogPath);
}
+ private Changelog tryGetChangelog(long snapshotId) throws
FileNotFoundException {
+ Path changelogPath = longLivedChangelogPath(snapshotId);
+ return Changelog.tryFromPath(fileIO, changelogPath);
+ }
+
public Changelog longLivedChangelog(long snapshotId) {
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
}
@@ -216,8 +221,41 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot earliestSnapshot() {
- Long snapshotId = earliestSnapshotId();
- return snapshotId == null ? null : snapshot(snapshotId);
+ return earliestSnapshot(false);
+ }
+
+ private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
+ Long snapshotId = null;
+ if (includeChangelog) {
+ snapshotId = earliestLongLivedChangelogId();
+ }
+ if (snapshotId == null) {
+ snapshotId = earliestSnapshotId();
+ }
+ if (snapshotId == null) {
+ return null;
+ }
+
+ FunctionWithException<Long, Snapshot, FileNotFoundException>
snapshotFunction =
+ includeChangelog ? this::tryGetChangelogOrSnapshot :
this::tryGetSnapshot;
+
+ // The loss of the earliest snapshot is an event of small probability,
so the retry number
+ // here need not be too large.
+ int retry = 0;
+ do {
+ try {
+ return snapshotFunction.apply(snapshotId);
+ } catch (FileNotFoundException e) {
+ if (retry++ >= 3) {
+ throw new RuntimeException(e);
+ }
+ LOG.warn(
+ "The earliest snapshot or changelog was once
identified but disappeared. "
+ + "It might have been expired by other jobs
operating on this table. "
+ + "Searching for the second earliest snapshot
or changelog instead. ");
+ snapshotId++;
+ }
+ } while (true);
}
public @Nullable Long earliestSnapshotId() {
@@ -276,28 +314,34 @@ public class SnapshotManager implements Serializable {
}
}
+ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws
FileNotFoundException {
+ if (longLivedChangelogExists(snapshotId)) {
+ return tryGetChangelog(snapshotId);
+ } else {
+ return tryGetSnapshot(snapshotId);
+ }
+ }
+
/**
* Returns the latest snapshot earlier than the timestamp mills. A
non-existent snapshot may be
* returned if all snapshots are equal to or later than the timestamp
mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean
startFromChangelog) {
- Long earliestSnapshot = earliestSnapshotId();
- Long earliest;
- if (startFromChangelog) {
- Long earliestChangelog = earliestLongLivedChangelogId();
- earliest = earliestChangelog == null ? earliestSnapshot :
earliestChangelog;
- } else {
- earliest = earliestSnapshot;
- }
Long latest = latestSnapshotId();
- if (earliest == null || latest == null) {
+ if (latest == null) {
+ return null;
+ }
+
+ Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
+ if (earliestSnapshot == null) {
return null;
}
- if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
- return earliest - 1;
+ if (earliestSnapshot.timeMillis() >= timestampMills) {
+ return earliestSnapshot.id() - 1;
}
+ long earliest = earliestSnapshot.id();
while (earliest < latest) {
long mid = (earliest + latest + 1) / 2;
if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {
@@ -314,16 +358,17 @@ public class SnapshotManager implements Serializable {
* mills. If there is no such a snapshot, returns null.
*/
public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
- Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
- if (earliest == null || latest == null) {
+ if (latest == null) {
return null;
}
- Snapshot earliestSnapShot = snapshot(earliest);
- if (earliestSnapShot.timeMillis() > timestampMills) {
+ Snapshot earliestSnapShot = earliestSnapshot();
+ if (earliestSnapShot == null || earliestSnapShot.timeMillis() >
timestampMills) {
return earliestSnapShot;
}
+ long earliest = earliestSnapShot.id();
+
Snapshot finalSnapshot = null;
while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
@@ -376,16 +421,22 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
- Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
// If latest == Long.MIN_VALUE don't need next binary search for
watermark
// which can reduce IO cost with snapshot
- if (earliest == null || latest == null || snapshot(latest).watermark()
== Long.MIN_VALUE) {
+ if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}
+
+ Snapshot earliestSnapShot = earliestSnapshot();
+ if (earliestSnapShot == null) {
+ return null;
+ }
+ long earliest = earliestSnapShot.id();
+
Long earliestWatermark = null;
// find the first snapshot with watermark
- if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+ if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
@@ -435,16 +486,22 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
- Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
// If latest == Long.MIN_VALUE don't need next binary search for
watermark
// which can reduce IO cost with snapshot
- if (earliest == null || latest == null || snapshot(latest).watermark()
== Long.MIN_VALUE) {
+ if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}
+
+ Snapshot earliestSnapShot = earliestSnapshot();
+ if (earliestSnapShot == null) {
+ return null;
+ }
+ long earliest = earliestSnapShot.id();
+
Long earliestWatermark = null;
// find the first snapshot with watermark
- if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+ if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index e828a0c90a..20b4557043 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -27,6 +27,10 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -56,8 +60,42 @@ public class SnapshotManagerTest {
}
}
- @Test
- public void testEarlierThanTimeMillis() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEarliestSnapshot(boolean isRaceCondition) throws
IOException {
+ long millis = 1684726826L;
+ FileIO localFileIO = LocalFileIO.create();
+ SnapshotManager snapshotManager =
+ new TestSnapshotManager(localFileIO, new
Path(tempDir.toString()), isRaceCondition);
+ // create 10 snapshots
+ for (long i = 0; i < 10; i++) {
+ Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
+ localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
+ }
+
+
assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ?
1 : 0);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws
IOException {
+ long millis = 1684726826L;
+ FileIO localFileIO = LocalFileIO.create();
+ SnapshotManager snapshotManager =
+ new TestSnapshotManager(localFileIO, new
Path(tempDir.toString()), isRaceCondition);
+ // create 10 snapshots
+ for (long i = 0; i < 10; i++) {
+ Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000,
millis + i * 1000);
+ localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
+ }
+
+ assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999).id())
+ .isEqualTo(isRaceCondition ? 1 : 0);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws
IOException {
long base = System.currentTimeMillis();
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -70,7 +108,7 @@ public class SnapshotManagerTest {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ new TestSnapshotManager(localFileIO, new
Path(tempDir.toString()), isRaceCondition);
int firstSnapshotId = random.nextInt(1, 100);
for (int i = 0; i < numSnapshots; i++) {
Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i,
millis.get(i));
@@ -90,11 +128,24 @@ public class SnapshotManagerTest {
Long actual = snapshotManager.earlierThanTimeMills(time, false);
if (millis.get(numSnapshots - 1) < time) {
- assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots -
1);
+ if (isRaceCondition && millis.size() == 1) {
+ assertThat(actual).isNull();
+ } else {
+ assertThat(actual).isEqualTo(firstSnapshotId +
numSnapshots - 1);
+ }
} else {
for (int i = 0; i < numSnapshots; i++) {
if (millis.get(i) >= time) {
- assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
+ if (isRaceCondition && i == 0) {
+ // The first snapshot expired during invocation
+ if (millis.size() == 1) {
+ assertThat(actual).isNull();
+ } else {
+
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
+ }
+ } else {
+
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId + i - 1);
+ }
break;
}
}
@@ -102,31 +153,45 @@ public class SnapshotManagerTest {
}
}
- @Test
- public void testEarlierOrEqualTimeMills() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws
IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ new TestSnapshotManager(localFileIO, new
Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
}
- // there is no snapshot smaller than "millis - 1L" return the earliest
snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L).timeMillis())
- .isEqualTo(millis);
-
- // smaller than the second snapshot return the first snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999).timeMillis())
- .isEqualTo(millis);
- // equal to the second snapshot return the second snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1000).timeMillis())
- .isEqualTo(millis + 1000);
- // larger than the second snapshot return the second snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1001).timeMillis())
- .isEqualTo(millis + 1000);
+ if (isRaceCondition) {
+ // The earliest snapshot has expired, so always return the second
snapshot
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L).timeMillis())
+ .isEqualTo(millis + 1000L);
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999).timeMillis())
+ .isEqualTo(millis + 1000L);
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1000).timeMillis())
+ .isEqualTo(millis + 1000L);
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1001).timeMillis())
+ .isEqualTo(millis + 1000L);
+ } else {
+ // there is no snapshot smaller than "millis - 1L" return the
earliest snapshot
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L).timeMillis())
+ .isEqualTo(millis);
+
+ // smaller than the second snapshot return the first snapshot
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999).timeMillis())
+ .isEqualTo(millis);
+
+ // equal to the second snapshot return the second snapshot
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1000).timeMillis())
+ .isEqualTo(millis + 1000);
+ // larger than the second snapshot return the second snapshot
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1001).timeMillis())
+ .isEqualTo(millis + 1000);
+ }
}
@Test
@@ -154,12 +219,13 @@ public class SnapshotManagerTest {
assertThat(snapshotManager.laterOrEqualTimeMills(millis +
10001)).isNull();
}
- @Test
- public void testlaterOrEqualWatermark() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testLaterOrEqualWatermark(boolean isRaceCondition) throws
IOException {
long millis = Long.MIN_VALUE;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ new TestSnapshotManager(localFileIO, new
Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis,
Long.MIN_VALUE);
@@ -410,4 +476,35 @@ public class SnapshotManagerTest {
snapshotManager.commitChangelog(changelog, id);
assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog,
id));
}
+
+ /**
+ * Test {@link SnapshotManager} to mock situations when there is a race
condition, that the
+ * earliest snapshot is deleted by another thread in the middle of the
current thread's
+ * invocation.
+ */
+ private static class TestSnapshotManager extends SnapshotManager {
+ private final boolean isRaceCondition;
+
+ private boolean deleteEarliestSnapshot = false;
+
+ public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean
isRaceCondition) {
+ super(fileIO, tablePath);
+ this.isRaceCondition = isRaceCondition;
+ }
+
+ @Override
+ public @Nullable Long earliestSnapshotId() {
+ Long snapshotId = super.earliestSnapshotId();
+ if (isRaceCondition && snapshotId != null &&
!deleteEarliestSnapshot) {
+ Path snapshotPath = snapshotPath(snapshotId);
+ try {
+ fileIO().delete(snapshotPath, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ deleteEarliestSnapshot = true;
+ }
+ return snapshotId;
+ }
+ }
}