This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f2c6769b3 [core] Supports StartingContext and mini-refactor 
StartingScanner (#1955)
f2c6769b3 is described below

commit f2c6769b31802123a9e3e7e7f87f9eba82117c7e
Author: Yann Byron <[email protected]>
AuthorDate: Thu Sep 7 13:33:43 2023 +0800

    [core] Supports StartingContext and mini-refactor StartingScanner (#1955)
---
 .../table/source/AbstractInnerTableScan.java       | 44 ++++++++++++++--------
 .../table/source/InnerStreamTableScanImpl.java     | 32 ++++++++++++----
 .../paimon/table/source/InnerTableScanImpl.java    |  2 +-
 .../paimon/table/source/StreamTableScan.java       |  3 ++
 ...ngScanner.java => AbstractStartingScanner.java} | 28 ++++++++------
 .../source/snapshot/CompactedStartingScanner.java  | 18 +++++++--
 .../ContinuousCompactorStartingScanner.java        |  9 ++++-
 .../ContinuousFromSnapshotFullStartingScanner.java | 18 ++++++---
 .../ContinuousFromSnapshotStartingScanner.java     | 13 +++----
 .../ContinuousFromTimestampStartingScanner.java    | 18 +++++++--
 .../snapshot/ContinuousLatestStartingScanner.java  | 18 ++++++++-
 .../snapshot/FullCompactedStartingScanner.java     | 39 +++++++++++++++++--
 .../table/source/snapshot/FullStartingScanner.java | 14 ++++++-
 .../snapshot/IncrementalStartingScanner.java       | 27 ++++++-------
 .../snapshot/IncrementalTagStartingScanner.java    | 17 +++++++--
 .../IncrementalTimeStampStartingScanner.java       | 25 +++++++-----
 ...otStartingScanner.java => StartingContext.java} | 38 ++++++++++---------
 .../table/source/snapshot/StartingScanner.java     |  5 ++-
 .../StaticFromSnapshotStartingScanner.java         | 19 ++++++----
 .../snapshot/StaticFromTagStartingScanner.java     | 25 ++++++++----
 .../StaticFromTimestampStartingScanner.java        | 16 +++++---
 .../apache/paimon/table/system/AuditLogTable.java  |  6 +++
 .../snapshot/CompactedStartingScannerTest.java     | 13 +++----
 .../ContinuousCompactorStartingScannerTest.java    | 11 +++---
 ...ContinuousFromTimestampStartingScannerTest.java | 14 +++----
 .../ContinuousLatestStartingScannerTest.java       | 11 +++---
 .../snapshot/FullCompactedStartingScannerTest.java | 13 +++----
 .../source/snapshot/FullStartingScannerTest.java   |  9 ++---
 .../snapshot/StaticFromTagStartingScannerTest.java |  8 ++--
 .../source/ContinuousFileSplitEnumeratorTest.java  |  6 +++
 30 files changed, 343 insertions(+), 176 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 034a333b9..2ec3125bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -43,6 +43,7 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.List;
 import java.util.Optional;
@@ -77,6 +78,7 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
     }
 
     protected StartingScanner createStartingScanner(boolean isStreaming) {
+        SnapshotManager snapshotManager = snapshotReader.snapshotManager();
         CoreOptions.StreamingCompactionType type =
                 options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
         switch (type) {
@@ -85,11 +87,11 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
                     checkArgument(
                             isStreaming,
                             "Set 'streaming-compact' in batch mode. This is 
unexpected.");
-                    return new ContinuousCompactorStartingScanner();
+                    return new 
ContinuousCompactorStartingScanner(snapshotManager);
                 }
             case BUCKET_UNAWARE:
                 {
-                    return new FullStartingScanner();
+                    return new FullStartingScanner(snapshotManager);
                 }
         }
 
@@ -99,18 +101,19 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
             ConsumerManager consumerManager = snapshotReader.consumerManager();
             Optional<Consumer> consumer = consumerManager.consumer(consumerId);
             if (consumer.isPresent()) {
-                return new 
ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
+                return new ContinuousFromSnapshotStartingScanner(
+                        snapshotManager, consumer.get().nextSnapshot());
             }
         }
 
         CoreOptions.StartupMode startupMode = options.startupMode();
         switch (startupMode) {
             case LATEST_FULL:
-                return new FullStartingScanner();
+                return new FullStartingScanner(snapshotManager);
             case LATEST:
                 return isStreaming
-                        ? new ContinuousLatestStartingScanner()
-                        : new FullStartingScanner();
+                        ? new ContinuousLatestStartingScanner(snapshotManager)
+                        : new FullStartingScanner(snapshotManager);
             case COMPACTED_FULL:
                 if (options.changelogProducer() == 
ChangelogProducer.FULL_COMPACTION
                         || 
options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -118,42 +121,51 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
                             options.toConfiguration()
                                     .getOptional(FULL_COMPACTION_DELTA_COMMITS)
                                     .orElse(1);
-                    return new FullCompactedStartingScanner(deltaCommits);
+                    return new FullCompactedStartingScanner(snapshotManager, 
deltaCommits);
                 } else {
-                    return new CompactedStartingScanner();
+                    return new CompactedStartingScanner(snapshotManager);
                 }
             case FROM_TIMESTAMP:
                 Long startupMillis = options.scanTimestampMills();
                 return isStreaming
-                        ? new 
ContinuousFromTimestampStartingScanner(startupMillis)
-                        : new 
StaticFromTimestampStartingScanner(startupMillis);
+                        ? new 
ContinuousFromTimestampStartingScanner(snapshotManager, startupMillis)
+                        : new 
StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
             case FROM_SNAPSHOT:
                 if (options.scanSnapshotId() != null) {
                     return isStreaming
-                            ? new 
ContinuousFromSnapshotStartingScanner(options.scanSnapshotId())
-                            : new 
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+                            ? new ContinuousFromSnapshotStartingScanner(
+                                    snapshotManager, options.scanSnapshotId())
+                            : new StaticFromSnapshotStartingScanner(
+                                    snapshotManager, options.scanSnapshotId());
                 } else {
                     checkArgument(!isStreaming, "Cannot scan from tag in 
streaming mode.");
-                    return new 
StaticFromTagStartingScanner(options().scanTagName());
+                    return new StaticFromTagStartingScanner(
+                            snapshotManager, options().scanTagName());
                 }
             case FROM_SNAPSHOT_FULL:
                 return isStreaming
-                        ? new 
ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
-                        : new 
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+                        ? new ContinuousFromSnapshotFullStartingScanner(
+                                snapshotManager, options.scanSnapshotId())
+                        : new StaticFromSnapshotStartingScanner(
+                                snapshotManager, options.scanSnapshotId());
             case INCREMENTAL:
                 checkArgument(!isStreaming, "Cannot read incremental in 
streaming mode.");
                 Pair<String, String> incrementalBetween = 
options.incrementalBetween();
                 if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) 
!= null) {
                     try {
                         return new IncrementalStartingScanner(
+                                snapshotManager,
                                 Long.parseLong(incrementalBetween.getLeft()),
                                 Long.parseLong(incrementalBetween.getRight()));
                     } catch (NumberFormatException e) {
                         return new IncrementalTagStartingScanner(
-                                incrementalBetween.getLeft(), 
incrementalBetween.getRight());
+                                snapshotManager,
+                                incrementalBetween.getLeft(),
+                                incrementalBetween.getRight());
                     }
                 } else {
                     return new IncrementalTimeStampStartingScanner(
+                            snapshotManager,
                             Long.parseLong(incrementalBetween.getLeft()),
                             Long.parseLong(incrementalBetween.getRight()));
                 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index eb8566b65..4ff0b3be0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
 import org.apache.paimon.utils.SnapshotManager;
@@ -51,9 +52,11 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     private final boolean supportStreamingReadOverwrite;
     private final DefaultValueAssigner defaultValueAssigner;
 
+    private boolean inited = false;
     private StartingScanner startingScanner;
     private FollowUpScanner followUpScanner;
     private BoundedChecker boundedChecker;
+
     private boolean isFullPhaseEnd = false;
     @Nullable private Long nextSnapshotId;
 
@@ -76,8 +79,28 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         return this;
     }
 
+    @Override
+    public StartingContext startingContext() {
+        if (!inited) {
+            initScanner();
+        }
+        return startingScanner.startingContext();
+    }
+
     @Override
     public RichPlan plan() {
+        if (!inited) {
+            initScanner();
+        }
+
+        if (nextSnapshotId == null) {
+            return tryFirstPlan();
+        } else {
+            return nextPlan();
+        }
+    }
+
+    private void initScanner() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(true);
         }
@@ -87,16 +110,11 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         if (boundedChecker == null) {
             boundedChecker = createBoundedChecker();
         }
-
-        if (nextSnapshotId == null) {
-            return tryFirstPlan();
-        } else {
-            return nextPlan();
-        }
+        inited = true;
     }
 
     private RichPlan tryFirstPlan() {
-        StartingScanner.Result result = startingScanner.scan(snapshotManager, 
snapshotReader);
+        StartingScanner.Result result = startingScanner.scan(snapshotReader);
         if (result instanceof ScannedResult) {
             ScannedResult scannedResult = (ScannedResult) result;
             long currentSnapshotId = scannedResult.currentSnapshotId();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index dc01bd429..cb4e39b23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -59,7 +59,7 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
 
         if (hasNext) {
             hasNext = false;
-            StartingScanner.Result result = 
startingScanner.scan(snapshotManager, snapshotReader);
+            StartingScanner.Result result = 
startingScanner.scan(snapshotReader);
             return DataFilePlan.fromResult(result);
         } else {
             throw new EndOfScanException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 869937137..5c8932059 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.utils.Restorable;
 
 import javax.annotation.Nullable;
@@ -33,6 +34,8 @@ import javax.annotation.Nullable;
 @Public
 public interface StreamTableScan extends TableScan, Restorable<Long> {
 
+    StartingContext startingContext();
+
     @Override
     RichPlan plan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
similarity index 57%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index a03a707be..d711781d7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,26 +18,30 @@
 
 package org.apache.paimon.table.source.snapshot;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/** The abstract class for StartingScanner. */
+public abstract class AbstractStartingScanner implements StartingScanner {
 
-/** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner implements StartingScanner {
+    protected final SnapshotManager snapshotManager;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FullStartingScanner.class);
+    protected Long startingSnapshotId = null;
+
+    AbstractStartingScanner(SnapshotManager snapshotManager) {
+        this.snapshotManager = snapshotManager;
+    }
+
+    protected ScanMode startingScanMode() {
+        return ScanMode.DELTA;
+    }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
-        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+    public StartingContext startingContext() {
         if (startingSnapshotId == null) {
-            LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return new NoSnapshot();
+            return StartingContext.EMPTY;
+        } else {
+            return new StartingContext(startingSnapshotId, startingScanMode() 
== ScanMode.ALL);
         }
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index a1d9d90a0..a15954627 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -29,13 +29,23 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 /** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
-public class CompactedStartingScanner implements StartingScanner {
+public class CompactedStartingScanner extends AbstractStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompactedStartingScanner.class);
 
+    public CompactedStartingScanner(SnapshotManager snapshotManager) {
+        super(snapshotManager);
+        this.startingSnapshotId = pick();
+    }
+
+    @Override
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
+
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
-        Long startingSnapshotId = pick(snapshotManager);
+    public Result scan(SnapshotReader snapshotReader) {
+        Long startingSnapshotId = pick();
         if (startingSnapshotId == null) {
             startingSnapshotId = snapshotManager.latestSnapshotId();
             if (startingSnapshotId == null) {
@@ -53,7 +63,7 @@ public class CompactedStartingScanner implements 
StartingScanner {
     }
 
     @Nullable
-    protected Long pick(SnapshotManager snapshotManager) {
+    protected Long pick() {
         return snapshotManager.pickOrLatest(s -> s.commitKind() == 
Snapshot.CommitKind.COMPACT);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index 4674002a8..5f24366ca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -25,13 +25,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** {@link StartingScanner} used internally for stand-alone streaming compact 
job sources. */
-public class ContinuousCompactorStartingScanner implements StartingScanner {
+public class ContinuousCompactorStartingScanner extends 
AbstractStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
 
+    public ContinuousCompactorStartingScanner(SnapshotManager snapshotManager) 
{
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotManager.earliestSnapshotId();
+    }
+
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public Result scan(SnapshotReader snapshotReader) {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (latestSnapshotId == null || earliestSnapshotId == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index 0a1a2c195..29bd0b7cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -26,20 +26,26 @@ import org.apache.paimon.utils.SnapshotManager;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
  * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
  */
-public class ContinuousFromSnapshotFullStartingScanner implements 
StartingScanner {
-    private final long snapshotId;
+public class ContinuousFromSnapshotFullStartingScanner extends 
AbstractStartingScanner {
 
-    public ContinuousFromSnapshotFullStartingScanner(long snapshotId) {
-        this.snapshotId = snapshotId;
+    public ContinuousFromSnapshotFullStartingScanner(
+            SnapshotManager snapshotManager, long snapshotId) {
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotId;
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
+
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (earliestSnapshotId == null) {
             return new NoSnapshot();
         }
-        long ceiledSnapshotId = Math.max(snapshotId, earliestSnapshotId);
+        long ceiledSnapshotId = Math.max(startingSnapshotId, 
earliestSnapshotId);
         return StartingScanner.fromPlan(
                 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 8f7319ac2..c966de58c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -25,22 +25,21 @@ import org.apache.paimon.utils.SnapshotManager;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
  * streaming read.
  */
-public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
+public class ContinuousFromSnapshotStartingScanner extends 
AbstractStartingScanner {
 
-    private final long snapshotId;
-
-    public ContinuousFromSnapshotStartingScanner(long snapshotId) {
-        this.snapshotId = snapshotId;
+    public ContinuousFromSnapshotStartingScanner(SnapshotManager 
snapshotManager, long snapshotId) {
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotId;
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public Result scan(SnapshotReader snapshotReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (earliestSnapshotId == null) {
             return new NoSnapshot();
         }
         // We should return the specified snapshot as next snapshot to 
indicate to scan delta data
         // from it. If the snapshotId < earliestSnapshotId, start from the 
earliest.
-        return new NextSnapshot(Math.max(snapshotId, earliestSnapshotId));
+        return new NextSnapshot(Math.max(startingSnapshotId, 
earliestSnapshotId));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index d72233519..6113773ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -28,19 +28,31 @@ import org.slf4j.LoggerFactory;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
  * streaming read.
  */
-public class ContinuousFromTimestampStartingScanner implements StartingScanner 
{
+public class ContinuousFromTimestampStartingScanner extends 
AbstractStartingScanner {
 
     private static final Logger LOG =
             
LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class);
 
     private final long startupMillis;
 
-    public ContinuousFromTimestampStartingScanner(long startupMillis) {
+    public ContinuousFromTimestampStartingScanner(
+            SnapshotManager snapshotManager, long startupMillis) {
+        super(snapshotManager);
         this.startupMillis = startupMillis;
+        this.startingSnapshotId = 
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public StartingContext startingContext() {
+        if (startingSnapshotId == null) {
+            return StartingContext.EMPTY;
+        } else {
+            return new StartingContext(startingSnapshotId + 1, false);
+        }
+    }
+
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 7a1c49c34..423181b37 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -28,13 +28,27 @@ import org.slf4j.LoggerFactory;
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST} 
startup mode of a
  * streaming read.
  */
-public class ContinuousLatestStartingScanner implements StartingScanner {
+public class ContinuousLatestStartingScanner extends AbstractStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
 
+    public ContinuousLatestStartingScanner(SnapshotManager snapshotManager) {
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotManager.latestSnapshotId();
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (startingSnapshotId == null) {
+            return StartingContext.EMPTY;
+        } else {
+            return new StartingContext(startingSnapshotId + 1, false);
+        }
+    }
+
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index 31df3c8b9..5015d56c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -21,25 +21,32 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions.StartupMode;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup 
mode with
  * 'full-compaction.delta-commits'.
  */
-public class FullCompactedStartingScanner extends CompactedStartingScanner {
+public class FullCompactedStartingScanner extends AbstractStartingScanner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
 
     private final int deltaCommits;
 
-    public FullCompactedStartingScanner(int deltaCommits) {
+    public FullCompactedStartingScanner(SnapshotManager snapshotManager, int 
deltaCommits) {
+        super(snapshotManager);
         this.deltaCommits = deltaCommits;
+        this.startingSnapshotId = pick();
     }
 
-    @Override
     @Nullable
-    protected Long pick(SnapshotManager snapshotManager) {
+    protected Long pick() {
         return snapshotManager.pickOrLatest(this::picked);
     }
 
@@ -49,6 +56,30 @@ public class FullCompactedStartingScanner extends 
CompactedStartingScanner {
                 && isFullCompactedIdentifier(identifier, deltaCommits);
     }
 
+    @Override
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
+
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
+        Long startingSnapshotId = pick();
+        if (startingSnapshotId == null) {
+            startingSnapshotId = snapshotManager.latestSnapshotId();
+            if (startingSnapshotId == null) {
+                LOG.debug("There is currently no snapshot. Wait for the 
snapshot generation.");
+                return new NoSnapshot();
+            } else {
+                LOG.debug(
+                        "No compact snapshot found, reading from the latest 
snapshot {}.",
+                        startingSnapshotId);
+            }
+        }
+
+        return StartingScanner.fromPlan(
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+    }
+
     public static boolean isFullCompactedIdentifier(long identifier, int 
deltaCommits) {
         return identifier % deltaCommits == 0 || identifier == Long.MAX_VALUE;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index a03a707be..c177be7ea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -26,12 +26,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner implements StartingScanner {
+public class FullStartingScanner extends AbstractStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FullStartingScanner.class);
 
+    public FullStartingScanner(SnapshotManager snapshotManager) {
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotManager.latestSnapshotId();
+    }
+
+    @Override
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
+
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index fbdf8a858..4e750688b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -35,26 +35,21 @@ import java.util.List;
 import java.util.Map;
 
 /** {@link StartingScanner} for incremental changes by snapshot. */
-public class IncrementalStartingScanner implements StartingScanner {
+public class IncrementalStartingScanner extends AbstractStartingScanner {
 
-    private long start;
-    private long end;
+    private long endingSnapshotId;
 
-    public IncrementalStartingScanner(long start, long end) {
-        this.start = start;
-        this.end = end;
+    public IncrementalStartingScanner(SnapshotManager snapshotManager, long 
start, long end) {
+        super(snapshotManager);
+        this.startingSnapshotId = start;
+        this.endingSnapshotId = end;
     }
 
     @Override
-    public Result scan(SnapshotManager manager, SnapshotReader reader) {
-        long earliestSnapshotId = manager.earliestSnapshotId();
-        long latestSnapshotId = manager.latestSnapshotId();
-        start = (start < earliestSnapshotId) ? earliestSnapshotId - 1 : start;
-        end = (end > latestSnapshotId) ? latestSnapshotId : end;
-
+    public Result scan(SnapshotReader reader) {
         Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
HashMap<>();
-        for (long i = start + 1; i < end + 1; i++) {
-            List<DataSplit> splits = readDeltaSplits(reader, 
manager.snapshot(i));
+        for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
+            List<DataSplit> splits = readDeltaSplits(reader, 
snapshotManager.snapshot(i));
             for (DataSplit split : splits) {
                 grouped.computeIfAbsent(
                                 Pair.of(split.partition(), split.bucket()), k 
-> new ArrayList<>())
@@ -70,7 +65,7 @@ public class IncrementalStartingScanner implements 
StartingScanner {
                     reader.splitGenerator().splitForBatch(entry.getValue())) {
                 result.add(
                         DataSplit.builder()
-                                .withSnapshot(end)
+                                .withSnapshot(endingSnapshotId)
                                 .withPartition(partition)
                                 .withBucket(bucket)
                                 .withDataFiles(files)
@@ -87,7 +82,7 @@ public class IncrementalStartingScanner implements 
StartingScanner {
 
                     @Override
                     public Long snapshotId() {
-                        return end;
+                        return endingSnapshotId;
                     }
 
                     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 242db6b19..2cdf5bff9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -23,19 +23,28 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 /** {@link StartingScanner} for incremental changes by tag. */
-public class IncrementalTagStartingScanner implements StartingScanner {
+public class IncrementalTagStartingScanner extends AbstractStartingScanner {
 
     private final String start;
     private final String end;
 
-    public IncrementalTagStartingScanner(String start, String end) {
+    public IncrementalTagStartingScanner(
+            SnapshotManager snapshotManager, String start, String end) {
+        super(snapshotManager);
         this.start = start;
         this.end = end;
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+        Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
+        if (startingSnapshot != null) {
+            this.startingSnapshotId = startingSnapshot.id();
+        }
     }
 
     @Override
-    public Result scan(SnapshotManager manager, SnapshotReader reader) {
-        TagManager tagManager = new TagManager(manager.fileIO(), 
manager.tablePath());
+    public Result scan(SnapshotReader reader) {
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         Snapshot tag1 = tagManager.taggedSnapshot(start);
         Snapshot tag2 = tagManager.taggedSnapshot(end);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
index ed8613a26..6e50903b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -22,31 +22,36 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.utils.SnapshotManager;
 
 /** {@link StartingScanner} for incremental changes by timestamp. */
-public class IncrementalTimeStampStartingScanner implements StartingScanner {
+public class IncrementalTimeStampStartingScanner extends 
AbstractStartingScanner {
 
     private final long startTimestamp;
     private final long endTimestamp;
 
-    public IncrementalTimeStampStartingScanner(long startTimestamp, long 
endTimestamp) {
+    public IncrementalTimeStampStartingScanner(
+            SnapshotManager snapshotManager, long startTimestamp, long 
endTimestamp) {
+        super(snapshotManager);
         this.startTimestamp = startTimestamp;
         this.endTimestamp = endTimestamp;
+        Snapshot startingSnapshot = 
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+        if (startingSnapshot != null) {
+            this.startingSnapshotId = startingSnapshot.id();
+        }
     }
 
     @Override
-    public Result scan(SnapshotManager manager, SnapshotReader reader) {
-        Snapshot earliestSnapshot = 
manager.snapshot(manager.earliestSnapshotId());
-        Snapshot latestSnapshot = manager.latestSnapshot();
+    public Result scan(SnapshotReader reader) {
+        Snapshot earliestSnapshot = 
snapshotManager.snapshot(snapshotManager.earliestSnapshotId());
+        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
         if (startTimestamp > latestSnapshot.timeMillis()
                 || endTimestamp < earliestSnapshot.timeMillis()) {
             return new NoSnapshot();
         }
-        Snapshot startSnapshot = 
manager.earlierOrEqualTimeMills(startTimestamp);
         Long startSnapshotId =
-                (startSnapshot == null) ? earliestSnapshot.id() - 1 : 
startSnapshot.id();
-        Snapshot endSnapshot = manager.earlierOrEqualTimeMills(endTimestamp);
+                (startingSnapshotId == null) ? earliestSnapshot.id() - 1 : 
startingSnapshotId;
+        Snapshot endSnapshot = 
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
         Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : 
endSnapshot.id();
         IncrementalStartingScanner incrementalStartingScanner =
-                new IncrementalStartingScanner(startSnapshotId, endSnapshotId);
-        return incrementalStartingScanner.scan(manager, reader);
+                new IncrementalStartingScanner(snapshotManager, 
startSnapshotId, endSnapshotId);
+        return incrementalStartingScanner.scan(reader);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
similarity index 50%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
index 1157e22d2..9204f4d47 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
@@ -18,28 +18,30 @@
 
 package org.apache.paimon.table.source.snapshot;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.utils.SnapshotManager;
+/** That contains some information that will be used out of StartingScanner. */
+public class StartingContext {
+    /**
+     * Notice: The snapshot ID is the initial one corresponding to the 
StartScanner configuration,
+     * not necessarily the snapshot ID at the time of the actual scan. E.g, in
+     * ContinuousFromSnapshotFullStartingScanner, this snapshot ID used in the 
first scan is the
+     * bigger one between the configured one and the earliest one.
+     */
+    private final Long snapshotId;
 
-/**
- * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
- * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
- */
-public class StaticFromSnapshotStartingScanner implements StartingScanner {
-    private final long snapshotId;
+    private final Boolean scanFullSnapshot;
 
-    public StaticFromSnapshotStartingScanner(long snapshotId) {
+    public StartingContext(Long snapshotId, Boolean scanFullSnapshot) {
         this.snapshotId = snapshotId;
+        this.scanFullSnapshot = scanFullSnapshot;
+    }
+
+    public Long getSnapshotId() {
+        return this.snapshotId;
     }
 
-    @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
-        if (snapshotManager.earliestSnapshotId() == null
-                || snapshotId < snapshotManager.earliestSnapshotId()) {
-            return new NoSnapshot();
-        }
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
+    public Boolean getScanFullSnapshot() {
+        return this.scanFullSnapshot;
     }
+
+    public static final StartingContext EMPTY = new StartingContext(0L, false);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index af44a8b8a..b09bfe298 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -20,14 +20,15 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.List;
 
 /** Helper class for the first planning of {@link TableScan}. */
 public interface StartingScanner {
 
-    Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader);
+    StartingContext startingContext();
+
+    Result scan(SnapshotReader snapshotReader);
 
     /** Scan result of {@link #scan}. */
     interface Result {}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 1157e22d2..97144e8f8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -26,20 +26,25 @@ import org.apache.paimon.utils.SnapshotManager;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
  * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
  */
-public class StaticFromSnapshotStartingScanner implements StartingScanner {
-    private final long snapshotId;
+public class StaticFromSnapshotStartingScanner extends AbstractStartingScanner 
{
 
-    public StaticFromSnapshotStartingScanner(long snapshotId) {
-        this.snapshotId = snapshotId;
+    public StaticFromSnapshotStartingScanner(SnapshotManager snapshotManager, 
long snapshotId) {
+        super(snapshotManager);
+        this.startingSnapshotId = snapshotId;
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
+
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
         if (snapshotManager.earliestSnapshotId() == null
-                || snapshotId < snapshotManager.earliestSnapshotId()) {
+                || startingSnapshotId < snapshotManager.earliestSnapshotId()) {
             return new NoSnapshot();
         }
         return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index b402851a1..20bb3199c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -25,21 +25,32 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 /** {@link StartingScanner} for the {@link CoreOptions#SCAN_TAG_NAME} of a 
batch read. */
-public class StaticFromTagStartingScanner implements StartingScanner {
+public class StaticFromTagStartingScanner extends AbstractStartingScanner {
 
     private final String tagName;
 
-    public StaticFromTagStartingScanner(String tagName) {
+    public StaticFromTagStartingScanner(SnapshotManager snapshotManager, 
String tagName) {
+        super(snapshotManager);
         this.tagName = tagName;
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+        Snapshot snapshot = tagManager.taggedSnapshot(this.tagName);
+        if (snapshot != null) {
+            this.startingSnapshotId = snapshot.id();
+        }
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
-        TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+    public ScanMode startingScanMode() {
+        return ScanMode.ALL;
+    }
 
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
+        if (startingSnapshotId == null) {
+            return new NoSnapshot();
+        }
         return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 315b749df..1c039c0c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -32,28 +32,32 @@ import javax.annotation.Nullable;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
  * batch read.
  */
-public class StaticFromTimestampStartingScanner implements StartingScanner {
+public class StaticFromTimestampStartingScanner extends 
AbstractStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
 
     private final long startupMillis;
 
-    public StaticFromTimestampStartingScanner(long startupMillis) {
+    public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager, 
long startupMillis) {
+        super(snapshotManager);
         this.startupMillis = startupMillis;
+        Snapshot snapshot = timeTravelToTimestamp(snapshotManager, 
startupMillis);
+        if (snapshot != null) {
+            this.startingSnapshotId = snapshot.id();
+        }
     }
 
     @Override
-    public Result scan(SnapshotManager snapshotManager, SnapshotReader 
snapshotReader) {
-        Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager, 
startupMillis);
-        if (startingSnapshot == null) {
+    public Result scan(SnapshotReader snapshotReader) {
+        if (startingSnapshotId == null) {
             LOG.debug(
                     "There is currently no snapshot earlier than or equal to 
timestamp[{}]",
                     startupMillis);
             return new NoSnapshot();
         }
         return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshot.id()).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e8e90df1b..28bba8901 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -45,6 +45,7 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -307,6 +308,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public StartingContext startingContext() {
+            return streamScan.startingContext();
+        }
+
         @Override
         public RichPlan plan() {
             return streamScan.plan();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index d99a5dbcb..60c957300 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -55,9 +55,9 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
 
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        CompactedStartingScanner scanner = new 
CompactedStartingScanner(snapshotManager);
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(3);
         assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
@@ -69,9 +69,8 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+        CompactedStartingScanner scanner = new 
CompactedStartingScanner(snapshotManager);
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -87,11 +86,11 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
 
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        CompactedStartingScanner scanner = new 
CompactedStartingScanner(snapshotManager);
 
         // No compact snapshot found, reading from the latest snapshot
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(1);
 
         write.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index d11565ff5..a55a36f01 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -58,9 +58,10 @@ public class ContinuousCompactorStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
-        ContinuousCompactorStartingScanner scanner = new 
ContinuousCompactorStartingScanner();
+        ContinuousCompactorStartingScanner scanner =
+                new ContinuousCompactorStartingScanner(snapshotManager);
         StartingScanner.NextSnapshot result =
-                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         assertThat(result.nextSnapshotId()).isEqualTo(4);
 
         write.close();
@@ -70,8 +71,8 @@ public class ContinuousCompactorStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        ContinuousCompactorStartingScanner scanner = new 
ContinuousCompactorStartingScanner();
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+        ContinuousCompactorStartingScanner scanner =
+                new ContinuousCompactorStartingScanner(snapshotManager);
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 308e04e60..e9364eeff 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -58,9 +58,9 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         long timestamp = snapshotManager.snapshot(3).timeMillis();
 
         ContinuousFromTimestampStartingScanner scanner =
-                new ContinuousFromTimestampStartingScanner(timestamp);
+                new ContinuousFromTimestampStartingScanner(snapshotManager, 
timestamp);
         StartingScanner.NextSnapshot result =
-                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         assertThat(result.nextSnapshotId()).isEqualTo(3);
 
         write.close();
@@ -71,9 +71,9 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousFromTimestampStartingScanner scanner =
-                new 
ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+                new ContinuousFromTimestampStartingScanner(
+                        snapshotManager, System.currentTimeMillis());
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -92,9 +92,9 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         long timestamp = snapshotManager.snapshot(1).timeMillis();
 
         ContinuousFromTimestampStartingScanner scanner =
-                new ContinuousFromTimestampStartingScanner(timestamp);
+                new ContinuousFromTimestampStartingScanner(snapshotManager, 
timestamp);
         StartingScanner.NextSnapshot result =
-                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         // next snapshot
         assertThat(result.nextSnapshotId()).isEqualTo(1);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index 25a292c2f..bfd86e584 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -48,9 +48,10 @@ public class ContinuousLatestStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
-        ContinuousLatestStartingScanner scanner = new 
ContinuousLatestStartingScanner();
+        ContinuousLatestStartingScanner scanner =
+                new ContinuousLatestStartingScanner(snapshotManager);
         StartingScanner.NextSnapshot result =
-                (StartingScanner.NextSnapshot) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         assertThat(result.nextSnapshotId()).isEqualTo(3);
 
         write.close();
@@ -60,8 +61,8 @@ public class ContinuousLatestStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        ContinuousLatestStartingScanner scanner = new 
ContinuousLatestStartingScanner();
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+        ContinuousLatestStartingScanner scanner =
+                new ContinuousLatestStartingScanner(snapshotManager);
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index bfdb5d3e8..b11d30b3a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -45,9 +45,9 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
 
-        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(snapshotManager, 3);
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(8);
 
         write.close();
@@ -57,9 +57,8 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(snapshotManager, 3);
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
     @Test
@@ -85,11 +84,11 @@ public class FullCompactedStartingScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
 
-        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(snapshotManager, 3);
 
         // No compact snapshot found, reading from the latest snapshot
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(4);
 
         write.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 0ab9cfbf4..20a65a04b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -50,9 +50,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
-        FullStartingScanner scanner = new FullStartingScanner();
+        FullStartingScanner scanner = new FullStartingScanner(snapshotManager);
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(2);
         assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
@@ -64,8 +64,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        FullStartingScanner scanner = new FullStartingScanner();
-        assertThat(scanner.scan(snapshotManager, snapshotReader))
-                .isInstanceOf(StartingScanner.NoSnapshot.class);
+        FullStartingScanner scanner = new FullStartingScanner(snapshotManager);
+        
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
index eb524625b..afdffc37e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
@@ -53,9 +53,10 @@ public class StaticFromTagStartingScannerTest extends 
ScannerTestBase {
 
         table.createTag("tag2", 2);
 
-        StaticFromTagStartingScanner scanner = new 
StaticFromTagStartingScanner("tag2");
+        StaticFromTagStartingScanner scanner =
+                new StaticFromTagStartingScanner(snapshotManager, "tag2");
         StartingScanner.ScannedResult result =
-                (StartingScanner.ScannedResult) scanner.scan(snapshotManager, 
snapshotReader);
+                (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(2);
         assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(
@@ -68,8 +69,7 @@ public class StaticFromTagStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNonExistingTag() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        StaticFromTagStartingScanner scanner = new 
StaticFromTagStartingScanner("non-existing");
-        assertThatThrownBy(() -> scanner.scan(snapshotManager, snapshotReader))
+        assertThatThrownBy(() -> new 
StaticFromTagStartingScanner(snapshotManager, "non-existing"))
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
                                 IllegalArgumentException.class,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 4d51c0f89..adf88ffe6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SnapshotNotExistPlan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -830,6 +831,11 @@ public class ContinuousFileSplitEnumeratorTest {
             this.nextSnapshotId = null;
         }
 
+        @Override
+        public StartingContext startingContext() {
+            return null;
+        }
+
         @Override
         public RichPlan plan() {
             Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();

Reply via email to