This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d76bdb25 [core] when use from-timestamp read data from snapshot, if
time < earliestSnapshot time , no data return (#3904) (#3910)
8d76bdb25 is described below
commit 8d76bdb25914e1db1d19938f4a2fdd16d43187e3
Author: chun.ji <[email protected]>
AuthorDate: Sun Aug 11 21:55:31 2024 +0800
[core] when use from-timestamp read data from snapshot, if time <
earliestSnapshot time , no data return (#3904) (#3910)
---
.../source/snapshot/IncrementalTimeStampStartingScanner.java | 10 +++++++++-
.../src/main/java/org/apache/paimon/utils/SnapshotManager.java | 5 +++--
.../test/java/org/apache/paimon/flink/TimeTravelITCase.java | 6 ++++--
.../paimon/spark/SparkTimeTravelWithDataFrameITCase.java | 2 +-
4 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
index 9fa59fdd7..bccb0b3d3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -52,8 +52,16 @@ public class IncrementalTimeStampStartingScanner extends
AbstractStartingScanner
|| endTimestamp < earliestSnapshot.timeMillis()) {
return new NoSnapshot();
}
+ // in org.apache.paimon.utils.SnapshotManager.earlierOrEqualTimeMills
+ // 1. if earliestSnapshotId or latestSnapshotId is null
startingSnapshotId will be null
+ // 2. if earliestSnapShot.timeMillis() > startTimestamp
startingSnapshotId will be
+ // earliestSnapShotId
+ // if earliestSnapShot.timeMillis() > startTimestamp we should
include the earliestSnapShot
+ // data
Long startSnapshotId =
- (startingSnapshotId == null) ? earliestSnapshot.id() - 1 :
startingSnapshotId;
+ (startingSnapshotId == null || earliestSnapshot.timeMillis() >
startTimestamp)
+ ? earliestSnapshot.id() - 1
+ : startingSnapshotId;
Snapshot endSnapshot =
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() :
endSnapshot.id();
IncrementalStartingScanner incrementalStartingScanner =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index af83fab6a..1b64d8bc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -270,8 +270,9 @@ public class SnapshotManager implements Serializable {
return null;
}
- if (snapshot(earliest).timeMillis() > timestampMills) {
- return null;
+ Snapshot earliestSnapShot = snapshot(earliest);
+ if (earliestSnapShot.timeMillis() > timestampMills) {
+ return earliestSnapShot;
}
Snapshot finalSnapshot = null;
while (earliest <= latest) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
index 8cbc4d5b1..1c9aa0547 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -112,8 +112,10 @@ public class TimeTravelITCase extends CatalogITCaseBase {
public void testTravelToNonExistedTimestamp() {
sql("CREATE TABLE t (k INT, v STRING)");
sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
- assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP
'1900-01-01 00:00:00'"))
- .isEmpty();
+ assertThat(
+ sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP
'1900-01-01 00:00:00'")
+ .toString())
+ .isEqualTo("[+I[1, hello], +I[2, world]]");
}
@Test
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
index fcb66f8e9..9f613c1b9 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -132,7 +132,7 @@ public class SparkTimeTravelWithDataFrameITCase extends
SparkReadTestBase {
.option("path", tablePath1.toString())
.option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
.load();
- assertThat(dataset.collectAsList()).isEmpty();
+ assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,2,1],
[5,6,3]]");
}
@Test