This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 55957987e7430fdef81bdbd85f8b5db8313a839d
Author: JunZhang <[email protected]>
AuthorDate: Fri Dec 29 18:37:43 2023 +0800

    [core] fix streaming read empty table in latest mode (#2589)
    
    Co-authored-by: yuzelin <[email protected]>
---
 .../snapshot/ContinuousLatestStartingScanner.java       | 12 +++++++++---
 .../apache/paimon/flink/ContinuousFileStoreITCase.java  | 17 +++++++++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index c1a6054ae..96c3e700f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -40,11 +41,16 @@ public class ContinuousLatestStartingScanner extends 
AbstractStartingScanner {
 
     @Override
     public Result scan(SnapshotReader snapshotReader) {
-        Long startingSnapshotId = snapshotManager.latestSnapshotId();
-        if (startingSnapshotId == null) {
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        if (latestSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
             return new NoSnapshot();
         }
-        return new NextSnapshot(startingSnapshotId + 1);
+
+        // If there's no snapshot before the reading job starts,
+        // then the first snapshot should be considered as an incremental 
snapshot
+        long nextSnapshot =
+                startingSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+        return new NextSnapshot(nextSnapshot);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index eda44b0bc..767f2bfe6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -216,6 +216,23 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         iterator.close();
     }
 
+    @Test
+    public void testContinuousLatestStartingFromEmpty() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter("SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='latest') */"));
+
+        sql("INSERT INTO T1 VALUES ('1', 'Hello', 'World')");
+        sql("INSERT INTO T1 VALUES ('2', 'Apache', 'Paimon')");
+        sql("INSERT INTO T1 VALUES ('3', 'C', 'c')");
+
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "Hello", "World"),
+                        Row.of("2", "Apache", "Paimon"),
+                        Row.of("3", "C", "c"));
+    }
+
     @Test
     public void testContinuousFromTimestamp() throws Exception {
         String sql =

Reply via email to