This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6bf7ddccac [core] Throw exception when travel to timestamp before the
earliest snapshot (#5423)
6bf7ddccac is described below
commit 6bf7ddccac44e285f338b17e25507c2ed7d110dd
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 9 10:28:32 2025 +0800
[core] Throw exception when travel to timestamp before the earliest
snapshot (#5423)
---
.../StaticFromSnapshotStartingScanner.java | 6 ++----
.../StaticFromTimestampStartingScanner.java | 18 ++++++++++--------
.../org/apache/paimon/utils/SnapshotManager.java | 4 ++--
.../apache/paimon/utils/SnapshotManagerTest.java | 14 ++++++--------
.../org/apache/paimon/flink/TimeTravelITCase.java | 16 +++++++++++-----
.../apache/paimon/spark/SparkTimeTravelITCase.java | 22 ++++++++++++++++------
.../spark/SparkTimeTravelWithDataFrameITCase.java | 21 +++++++++++++--------
7 files changed, 60 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 19b9da2440..3621259a6a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -52,12 +52,10 @@ public class StaticFromSnapshotStartingScanner extends
ReadPlanStartingScanner {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (earliestSnapshotId == null || latestSnapshotId == null) {
- LOG.warn("There is currently no snapshot. Waiting for snapshot
generation.");
- return null;
+ throw new IllegalArgumentException("There is currently no
snapshot.");
}
- // Checks earlier whether the specified scan snapshot id is valid and
throws the correct
- // exception.
+ // Checks earlier whether the specified scan snapshot id is valid.
checkArgument(
startingSnapshotId >= earliestSnapshotId && startingSnapshotId
<= latestSnapshotId,
"The specified scan snapshotId %s is out of available
snapshotId range [%s, %s].",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index a5087890a9..0f20938992 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -43,19 +43,21 @@ public class StaticFromTimestampStartingScanner extends
ReadPlanStartingScanner
super(snapshotManager);
this.startupMillis = startupMillis;
Snapshot snapshot = timeTravelToTimestamp(snapshotManager,
startupMillis);
- if (snapshot != null) {
- this.startingSnapshotId = snapshot.id();
+ if (snapshot == null) {
+ Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+ throw new IllegalArgumentException(
+ String.format(
+ "There is currently no snapshot earlier than or
equal to timestamp [%s], the earliest snapshot's timestamp is [%s]",
+ startupMillis,
+ earliestSnapshot == null
+ ? "null"
+ :
String.valueOf(earliestSnapshot.timeMillis())));
}
+ this.startingSnapshotId = snapshot.id();
}
@Override
public SnapshotReader configure(SnapshotReader snapshotReader) {
- if (startingSnapshotId == null) {
- LOG.debug(
- "There is currently no snapshot earlier than or equal to
timestamp[{}]",
- startupMillis);
- return null;
- }
return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 42c8b2ba71..09dcc567b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -264,7 +264,7 @@ public class SnapshotManager implements Serializable {
}
/**
- * Returns a {@link Snapshot} whoes commit time is earlier than or equal
to given timestamp
+ * Returns a {@link Snapshot} whose commit time is earlier than or equal
to given timestamp
* mills. If there is no such a snapshot, returns null.
*/
public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
@@ -275,7 +275,7 @@ public class SnapshotManager implements Serializable {
Snapshot earliestSnapShot = earliestSnapshot(latest);
if (earliestSnapShot == null || earliestSnapShot.timeMillis() >
timestampMills) {
- return earliestSnapShot;
+ return null;
}
long earliest = earliestSnapShot.id();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 45ec66000c..ed5badb764 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -176,19 +176,17 @@ public class SnapshotManagerTest {
}
if (isRaceCondition) {
- // The earliest snapshot has expired, so always return the second
snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L).timeMillis())
- .isEqualTo(millis + 1000L);
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999).timeMillis())
- .isEqualTo(millis + 1000L);
+ // The earliest snapshot has expired, so always return the second
snapshot, smaller than
+ // the second snapshot return null
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L)).isEqualTo(null);
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999)).isEqualTo(null);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1000).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
1001).timeMillis())
.isEqualTo(millis + 1000L);
} else {
- // there is no snapshot smaller than "millis - 1L" return the
earliest snapshot
- assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L).timeMillis())
- .isEqualTo(millis);
+ // there is no snapshot smaller than "millis - 1L" return null
+ assertThat(snapshotManager.earlierOrEqualTimeMills(millis -
1L)).isEqualTo(null);
// smaller than the second snapshot return the first snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis +
999).timeMillis())
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
index 1c9aa0547c..f3707f3258 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -27,7 +27,9 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT case for flink time travel. */
public class TimeTravelITCase extends CatalogITCaseBase {
@@ -109,13 +111,17 @@ public class TimeTravelITCase extends CatalogITCaseBase {
}
@Test
- public void testTravelToNonExistedTimestamp() {
+ public void testTravelToTimestampBeforeTheEarliestSnapshot() {
sql("CREATE TABLE t (k INT, v STRING)");
sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
- assertThat(
- sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP
'1900-01-01 00:00:00'")
- .toString())
- .isEqualTo("[+I[1, hello], +I[2, world]]");
+ assertThatThrownBy(
+ () ->
+ sql("SELECT * FROM t FOR SYSTEM_TIME AS OF
TIMESTAMP '1900-01-01 00:00:00'")
+ .toString())
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "There is currently no snapshot earlier than
or equal to timestamp"));
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 16a38c3a33..8e13cb18f5 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -151,19 +151,29 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
public void testTravelToNonExistedVersion() {
spark.sql("CREATE TABLE t (k INT, v STRING)");
- assertThat(spark.sql("SELECT * FROM t VERSION AS OF
2").collectAsList()).isEmpty();
+ assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF
2").collectAsList())
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class, "There is
currently no snapshot."));
}
@Test
- public void testTravelToNonExistedTimestamp() {
+ public void testTravelToTimestampBeforeTheEarliestSnapshot() {
long anchor = System.currentTimeMillis() / 1000;
spark.sql("CREATE TABLE t (k INT, v STRING)");
- assertThat(
- spark.sql(String.format("SELECT * FROM t TIMESTAMP AS
OF %s", anchor))
- .collectAsList())
- .isEmpty();
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ String.format(
+ "SELECT * FROM t
TIMESTAMP AS OF %s",
+ anchor))
+ .collectAsList())
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "There is currently no snapshot earlier than
or equal to timestamp"));
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
index 9f613c1b92..518813b35e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -125,14 +125,19 @@ public class SparkTimeTravelWithDataFrameITCase extends
SparkReadTestBase {
}
@Test
- public void testTravelToNonExistedTimestamp() {
- Dataset<Row> dataset =
- spark.read()
- .format("paimon")
- .option("path", tablePath1.toString())
- .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
- .load();
- assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,2,1],
[5,6,3]]");
+ public void testTravelToTimestampBeforeTheEarliestSnapshot() {
+ assertThatThrownBy(
+ () ->
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+
.option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
+ .load()
+ .collectAsList())
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "There is currently no snapshot earlier than
or equal to timestamp [0]"));
}
@Test