This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new feabeaabe [core][bug] BoundedChecker may check non-existing snapshot 
at fully read phase (#865)
feabeaabe is described below

commit feabeaabe43b33c865347b1e9319f32359da1a58
Author: yuzelin <[email protected]>
AuthorDate: Thu Apr 13 09:51:01 2023 +0800

    [core][bug] BoundedChecker may check non-existing snapshot at fully read 
phase (#865)
---
 .../apache/paimon/table/source/DataFilePlan.java   |  9 ++---
 .../table/source/InnerStreamTableScanImpl.java     | 21 ++++++-----
 .../source/snapshot/CompactedStartingScanner.java  |  5 ++-
 .../ContinuousCompactorStartingScanner.java        |  9 ++---
 .../ContinuousFromSnapshotStartingScanner.java     | 12 +++----
 .../ContinuousFromTimestampStartingScanner.java    |  7 ++--
 .../snapshot/ContinuousLatestStartingScanner.java  |  7 ++--
 .../table/source/snapshot/FullStartingScanner.java |  7 ++--
 .../table/source/snapshot/StartingScanner.java     | 42 ++++++++++++++--------
 .../StaticFromSnapshotStartingScanner.java         |  7 ++--
 .../StaticFromTimestampStartingScanner.java        |  5 ++-
 .../paimon/table/source/StreamTableScanTest.java   | 19 ++++++++++
 .../snapshot/CompactedStartingScannerTest.java     | 12 ++++---
 .../ContinuousCompactorStartingScannerTest.java    |  9 ++---
 ...ContinuousFromTimestampStartingScannerTest.java | 16 +++++----
 .../ContinuousLatestStartingScannerTest.java       |  9 ++---
 .../snapshot/FullCompactedStartingScannerTest.java | 12 ++++---
 .../source/snapshot/FullStartingScannerTest.java   |  8 +++--
 18 files changed, 124 insertions(+), 92 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index 380325b9d..6c113f2df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -20,8 +20,6 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -40,7 +38,10 @@ public class DataFilePlan implements TableScan.Plan {
         return new ArrayList<>(splits);
     }
 
-    public static DataFilePlan fromResult(@Nullable StartingScanner.Result 
result) {
-        return new DataFilePlan(result == null ? Collections.emptyList() : 
result.splits());
+    public static DataFilePlan fromResult(StartingScanner.Result result) {
+        return new DataFilePlan(
+                result instanceof StartingScanner.ScannedResult
+                        ? ((StartingScanner.ScannedResult) result).splits()
+                        : Collections.emptyList());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 46c01b219..5ec559cde 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -51,7 +51,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     private StartingScanner startingScanner;
     private FollowUpScanner followUpScanner;
     private BoundedChecker boundedChecker;
-    private boolean isEnd = false;
+    private boolean isFullPhaseEnd = false;
     @Nullable private Long nextSnapshotId;
 
     public InnerStreamTableScanImpl(
@@ -92,19 +92,24 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
 
     private Plan tryFirstPlan() {
         StartingScanner.Result result = startingScanner.scan(snapshotManager, 
snapshotSplitReader);
-        if (result != null) {
-            long snapshotId = result.snapshotId();
-            nextSnapshotId = snapshotId + 1;
-            if 
(boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshotId))) {
-                isEnd = true;
-            }
+        if (result instanceof StartingScanner.ScannedResult) {
+            long currentSnapshotId = ((StartingScanner.ScannedResult) 
result).currentSnapshotId();
+            nextSnapshotId = currentSnapshotId + 1;
+            isFullPhaseEnd =
+                    
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
+        } else if (result instanceof StartingScanner.NextSnapshot) {
+            nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
+            isFullPhaseEnd =
+                    snapshotManager.snapshotExists(nextSnapshotId - 1)
+                            && boundedChecker.shouldEndInput(
+                                    snapshotManager.snapshot(nextSnapshotId - 
1));
         }
         return DataFilePlan.fromResult(result);
     }
 
     private Plan nextPlan() {
         while (true) {
-            if (isEnd) {
+            if (isFullPhaseEnd) {
                 throw new EndOfScanException();
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index adb9d2fee..9515d8ee0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -33,14 +33,13 @@ public class CompactedStartingScanner implements 
StartingScanner {
     private static final Logger LOG = 
LoggerFactory.getLogger(CompactedStartingScanner.class);
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long startingSnapshotId = pick(snapshotManager);
         if (startingSnapshotId == null) {
             startingSnapshotId = snapshotManager.latestSnapshotId();
             if (startingSnapshotId == null) {
                 LOG.debug("There is currently no snapshot. Wait for the 
snapshot generation.");
-                return null;
+                return new NoSnapshot();
             } else {
                 LOG.debug(
                         "No compact snapshot found, reading from the latest 
snapshot {}.",
@@ -48,7 +47,7 @@ public class CompactedStartingScanner implements 
StartingScanner {
             }
         }
 
-        return new Result(
+        return new ScannedResult(
                 startingSnapshotId,
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index e92291e5b..907f9a896 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /** {@link StartingScanner} used internally for stand-alone streaming compact 
job sources. */
 public class ContinuousCompactorStartingScanner implements StartingScanner {
 
@@ -33,26 +31,25 @@ public class ContinuousCompactorStartingScanner implements 
StartingScanner {
             LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (latestSnapshotId == null || earliestSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
-            return null;
+            return new NoSnapshot();
         }
 
         for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
                 LOG.debug("Found latest compact snapshot {}, reading from the 
next snapshot.", id);
-                return new Result(id);
+                return new NextSnapshot(id + 1);
             }
         }
 
         LOG.debug(
                 "No compact snapshot found, reading from the earliest snapshot 
{}.",
                 earliestSnapshotId);
-        return new Result(earliestSnapshotId - 1);
+        return new NextSnapshot(earliestSnapshotId);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 355bd6d6f..aca3f55fc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 /**
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
  * streaming read.
@@ -36,15 +34,13 @@ public class ContinuousFromSnapshotStartingScanner 
implements StartingScanner {
     }
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (earliestSnapshotId == null) {
-            return null;
+            return new NoSnapshot();
         }
-        // We should use `snapshotId - 1` here to start to scan delta data 
from specific snapshot
-        // id. If the snapshotId < earliestSnapshotId, start to scan from the 
earliest.
-        return new Result(
-                snapshotId >= earliestSnapshotId ? snapshotId - 1 : 
earliestSnapshotId - 1);
+        // We should return the specified snapshot as next snapshot to 
indicate to scan delta data
+        // from it. If the snapshotId < earliestSnapshotId, start from the 
earliest.
+        return new NextSnapshot(Math.max(snapshotId, earliestSnapshotId));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 4f6f318e1..985752ae5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /**
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
  * streaming read.
@@ -42,13 +40,12 @@ public class ContinuousFromTimestampStartingScanner 
implements StartingScanner {
     }
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return null;
+            return new NoSnapshot();
         }
-        return new Result(startingSnapshotId);
+        return new NextSnapshot(startingSnapshotId + 1);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index f0c5c6fe1..099434be7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST} 
startup mode of a
  * streaming read.
@@ -36,13 +34,12 @@ public class ContinuousLatestStartingScanner implements 
StartingScanner {
             LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
-            return null;
+            return new NoSnapshot();
         }
-        return new Result(startingSnapshotId);
+        return new NextSnapshot(startingSnapshotId + 1);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index fdc93ab5d..6d11f1eb7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -25,22 +25,19 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
 public class FullStartingScanner implements StartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FullStartingScanner.class);
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return null;
+            return new NoSnapshot();
         }
-        return new Result(
+        return new ScannedResult(
                 startingSnapshotId,
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 3e850de3c..16fc46aa0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -22,38 +22,52 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
-import java.util.Collections;
 import java.util.List;
 
 /** Helper class for the first planning of {@link TableScan}. */
 public interface StartingScanner {
 
-    @Nullable
     Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader);
 
     /** Scan result of {@link #scan}. */
-    class Result {
+    interface Result {}
 
-        private final long snapshotId;
-        private final List<DataSplit> splits;
+    /** Currently, there is no snapshot, need to wait for the snapshot to be 
generated. */
+    class NoSnapshot implements Result {}
 
-        public Result(long snapshotId) {
-            this(snapshotId, Collections.emptyList());
-        }
+    /** Result with scanned snapshot. Next snapshot should be the current 
snapshot plus 1. */
+    class ScannedResult implements Result {
+        private final long currentSnapshotId;
+        private final List<DataSplit> splits;
 
-        public Result(long snapshotId, List<DataSplit> splits) {
-            this.snapshotId = snapshotId;
+        public ScannedResult(long currentSnapshotId, List<DataSplit> splits) {
+            this.currentSnapshotId = currentSnapshotId;
             this.splits = splits;
         }
 
-        public long snapshotId() {
-            return snapshotId;
+        public long currentSnapshotId() {
+            return currentSnapshotId;
         }
 
         public List<DataSplit> splits() {
             return splits;
         }
     }
+
+    /**
+     * Return the next snapshot for followup scanning. The current snapshot is 
not scanned (even
+     * doesn't exist), so there are no splits.
+     */
+    class NextSnapshot implements Result {
+
+        private final long nextSnapshotId;
+
+        public NextSnapshot(long nextSnapshotId) {
+            this.nextSnapshotId = nextSnapshotId;
+        }
+
+        public long nextSnapshotId() {
+            return nextSnapshotId;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 5ccf82563..1b796b923 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -22,8 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 /**
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
  * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
@@ -36,13 +34,12 @@ public class StaticFromSnapshotStartingScanner implements 
StartingScanner {
     }
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         if (snapshotManager.earliestSnapshotId() == null
                 || snapshotId < snapshotManager.earliestSnapshotId()) {
-            return null;
+            return new NoSnapshot();
         }
-        return new Result(
+        return new ScannedResult(
                 snapshotId,
                 
snapshotSplitReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).splits());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index f4437e41b..f90f81c6f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -44,16 +44,15 @@ public class StaticFromTimestampStartingScanner implements 
StartingScanner {
     }
 
     @Override
-    @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
         Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager, 
startupMillis);
         if (startingSnapshot == null) {
             LOG.debug(
                     "There is currently no snapshot earlier than or equal to 
timestamp[{}]",
                     startupMillis);
-            return null;
+            return new NoSnapshot();
         }
-        return new Result(
+        return new ScannedResult(
                 startingSnapshot.id(),
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index a249b2b2e..a6f4d36b5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -22,8 +22,10 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.snapshot.ScannerTestBase;
 import org.apache.paimon.types.RowKind;
@@ -235,4 +237,21 @@ public class StreamTableScanTest extends ScannerTestBase {
         write.close();
         commit.close();
     }
+
+    @Test
+    public void testStartingFromNonExistingSnapshot() throws Exception {
+        Table table =
+                this.table.copy(
+                        
Collections.singletonMap(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), "0"));
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = writeBuilder.newWrite();
+        StreamTableCommit commit = writeBuilder.newCommit();
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        StreamTableScan scan = table.newReadBuilder().newStreamScan();
+        TableScan.Plan plan = scan.plan();
+        assertThat(plan.splits()).isEmpty();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 01e40cfea..e16e54678 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -56,8 +56,9 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
 
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(3);
+        StartingScanner.ScannedResult result =
+                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.currentSnapshotId()).isEqualTo(3);
         assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
@@ -69,7 +70,8 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -88,7 +90,9 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         CompactedStartingScanner scanner = new CompactedStartingScanner();
 
         // No compact snapshot found, reading from the latest snapshot
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader).snapshotId()).isEqualTo(1);
+        StartingScanner.ScannedResult result =
+                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.currentSnapshotId()).isEqualTo(1);
 
         write.close();
         commit.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index c62fa0514..1153c18fd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -59,9 +59,9 @@ public class ContinuousCompactorStartingScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
         ContinuousCompactorStartingScanner scanner = new 
ContinuousCompactorStartingScanner();
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(3);
-        assertThat(result.splits()).isEmpty();
+        StartingScanner.NextSnapshot result =
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.nextSnapshotId()).isEqualTo(4);
 
         write.close();
         commit.close();
@@ -71,6 +71,7 @@ public class ContinuousCompactorStartingScannerTest extends 
ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousCompactorStartingScanner scanner = new 
ContinuousCompactorStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 69dcb55c4..2c6fe2610 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -59,9 +59,9 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
 
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(timestamp);
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(2);
-        assertThat(getResult(table.newRead(), 
toSplits(result.splits()))).isEmpty();
+        StartingScanner.NextSnapshot result =
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.nextSnapshotId()).isEqualTo(3);
 
         write.close();
         commit.close();
@@ -72,7 +72,8 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousFromTimestampStartingScanner scanner =
                 new 
ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -92,9 +93,10 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
 
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(timestamp);
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(0);
-        assertThat(getResult(table.newRead(), 
toSplits(result.splits()))).isEmpty();
+        StartingScanner.NextSnapshot result =
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        // next snapshot
+        assertThat(result.nextSnapshotId()).isEqualTo(1);
 
         write.close();
         commit.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index 33ea201b7..cc6d43e69 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -49,9 +49,9 @@ public class ContinuousLatestStartingScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
         ContinuousLatestStartingScanner scanner = new 
ContinuousLatestStartingScanner();
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(2);
-        assertThat(getResult(table.newRead(), 
toSplits(result.splits()))).isEmpty();
+        StartingScanner.NextSnapshot result =
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.nextSnapshotId()).isEqualTo(3);
 
         write.close();
         commit.close();
@@ -61,6 +61,7 @@ public class ContinuousLatestStartingScannerTest extends 
ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousLatestStartingScanner scanner = new 
ContinuousLatestStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index ba180684e..b5fba4830 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -46,8 +46,9 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
 
         FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(8);
+        StartingScanner.ScannedResult result =
+                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.currentSnapshotId()).isEqualTo(8);
 
         write.close();
         commit.close();
@@ -57,7 +58,8 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -86,7 +88,9 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
         FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
 
         // No compact snapshot found, reading from the latest snapshot
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader).snapshotId()).isEqualTo(4);
+        StartingScanner.ScannedResult result =
+                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.currentSnapshotId()).isEqualTo(4);
 
         write.close();
         commit.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 7ce631d20..9737c35b5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -51,8 +51,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
         FullStartingScanner scanner = new FullStartingScanner();
-        StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(2);
+        StartingScanner.ScannedResult result =
+                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotSplitReader);
+        assertThat(result.currentSnapshotId()).isEqualTo(2);
         assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
@@ -64,6 +65,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         FullStartingScanner scanner = new FullStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+                .isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }

Reply via email to