This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d76bdb25 [core] when use from-timestamp read data from snapshot, if 
time < earliestSnapshot time , no data return (#3904) (#3910)
8d76bdb25 is described below

commit 8d76bdb25914e1db1d19938f4a2fdd16d43187e3
Author: chun.ji <[email protected]>
AuthorDate: Sun Aug 11 21:55:31 2024 +0800

    [core] when use from-timestamp read data from snapshot, if time < 
earliestSnapshot time , no data return (#3904) (#3910)
---
 .../source/snapshot/IncrementalTimeStampStartingScanner.java   | 10 +++++++++-
 .../src/main/java/org/apache/paimon/utils/SnapshotManager.java |  5 +++--
 .../test/java/org/apache/paimon/flink/TimeTravelITCase.java    |  6 ++++--
 .../paimon/spark/SparkTimeTravelWithDataFrameITCase.java       |  2 +-
 4 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
index 9fa59fdd7..bccb0b3d3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -52,8 +52,16 @@ public class IncrementalTimeStampStartingScanner extends 
AbstractStartingScanner
                 || endTimestamp < earliestSnapshot.timeMillis()) {
             return new NoSnapshot();
         }
+        // in org.apache.paimon.utils.SnapshotManager.earlierOrEqualTimeMills
+        // 1. if earliestSnapshotId or latestSnapshotId is null 
startingSnapshotId will be null
+        // 2. if earliestSnapShot.timeMillis() > startTimestamp 
startingSnapshotId will be
+        // earliestSnapShotId
+        // if  earliestSnapShot.timeMillis() > startTimestamp we should 
include the earliestSnapShot
+        // data
         Long startSnapshotId =
-                (startingSnapshotId == null) ? earliestSnapshot.id() - 1 : 
startingSnapshotId;
+                (startingSnapshotId == null || earliestSnapshot.timeMillis() > 
startTimestamp)
+                        ? earliestSnapshot.id() - 1
+                        : startingSnapshotId;
         Snapshot endSnapshot = 
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
         Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : 
endSnapshot.id();
         IncrementalStartingScanner incrementalStartingScanner =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index af83fab6a..1b64d8bc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -270,8 +270,9 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
-        if (snapshot(earliest).timeMillis() > timestampMills) {
-            return null;
+        Snapshot earliestSnapShot = snapshot(earliest);
+        if (earliestSnapShot.timeMillis() > timestampMills) {
+            return earliestSnapShot;
         }
         Snapshot finalSnapshot = null;
         while (earliest <= latest) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
index 8cbc4d5b1..1c9aa0547 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -112,8 +112,10 @@ public class TimeTravelITCase extends CatalogITCaseBase {
     public void testTravelToNonExistedTimestamp() {
         sql("CREATE TABLE t (k INT, v STRING)");
         sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
-        assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP 
'1900-01-01 00:00:00'"))
-                .isEmpty();
+        assertThat(
+                        sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP 
'1900-01-01 00:00:00'")
+                                .toString())
+                .isEqualTo("[+I[1, hello], +I[2, world]]");
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
index fcb66f8e9..9f613c1b9 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -132,7 +132,7 @@ public class SparkTimeTravelWithDataFrameITCase extends 
SparkReadTestBase {
                         .option("path", tablePath1.toString())
                         .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
                         .load();
-        assertThat(dataset.collectAsList()).isEmpty();
+        assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,2,1], 
[5,6,3]]");
     }
 
     @Test

Reply via email to