This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 44269407b [core] Refactor StreamTableScan interface to spark streaming 
read (#1905)
44269407b is described below

commit 44269407b62eee418a1fff1ca7706428ef504c29
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 29 10:58:42 2023 +0800

    [core] Refactor StreamTableScan interface to spark streaming read (#1905)
---
 .../paimon/operation/AbstractFileStoreScan.java    | 16 ++--
 .../org/apache/paimon/operation/FileStoreScan.java |  5 +-
 .../apache/paimon/table/source/DataFilePlan.java   |  9 +--
 .../table/source/InnerStreamTableScanImpl.java     | 22 +++--
 .../paimon/table/source/InnerTableScanImpl.java    |  2 +-
 .../{SnapshotNotExistPlan.java => RichPlan.java}   | 35 +++++---
 .../ScanKind.java => table/source/ScanMode.java}   | 16 +++-
 .../paimon/table/source/SnapshotNotExistPlan.java  | 21 ++++-
 .../paimon/table/source/StreamTableScan.java       |  8 +-
 .../source/snapshot/CompactedStartingScanner.java  |  4 +-
 .../CompactionChangelogFollowUpScanner.java        |  4 +-
 .../ContinuousAppendAndCompactFollowUpScanner.java |  4 +-
 .../ContinuousCompactorFollowUpScanner.java        |  4 +-
 .../ContinuousFromSnapshotFullStartingScanner.java |  4 +-
 .../source/snapshot/DeltaFollowUpScanner.java      |  4 +-
 .../table/source/snapshot/FullStartingScanner.java |  4 +-
 .../snapshot/IncrementalStartingScanner.java       | 29 ++++++-
 .../snapshot/InputChangelogFollowUpScanner.java    |  4 +-
 .../table/source/snapshot/SnapshotReader.java      | 20 +----
 .../table/source/snapshot/SnapshotReaderImpl.java  | 27 +++++--
 .../table/source/snapshot/StartingScanner.java     | 29 +++----
 .../StaticFromSnapshotStartingScanner.java         |  4 +-
 .../snapshot/StaticFromTagStartingScanner.java     |  4 +-
 .../StaticFromTimestampStartingScanner.java        |  4 +-
 .../apache/paimon/table/system/AuditLogTable.java  | 18 ++---
 .../test/java/org/apache/paimon/TestFileStore.java |  6 +-
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  6 +-
 .../ChangelogValueCountFileStoreTableTest.java     | 10 +--
 .../table/ChangelogWithKeyFileDataTableTest.java   | 10 +--
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 18 ++---
 .../paimon/table/FileDataFilterTestBase.java       | 14 ++--
 .../paimon/table/source/StartupModeTest.java       | 37 +++++----
 .../flink/source/operator/MonitorFunction.java     |  6 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 93 ++++++++++++++--------
 .../source/FileStoreSourceSplitGeneratorTest.java  |  6 ++
 35 files changed, 302 insertions(+), 205 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 45f6bd153..2ea41bd6b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -32,6 +32,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
@@ -72,7 +73,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Snapshot specifiedSnapshot = null;
     private Filter<Integer> bucketFilter = null;
     private List<ManifestFileMeta> specifiedManifests = null;
-    private ScanKind scanKind = ScanKind.ALL;
+    private ScanMode scanMode = ScanMode.ALL;
     private Filter<Integer> levelFilter = null;
 
     private ManifestCacheFilter manifestCacheFilter = null;
@@ -169,8 +170,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withKind(ScanKind scanKind) {
-        this.scanKind = scanKind;
+    public FileStoreScan withKind(ScanMode scanMode) {
+        this.scanMode = scanMode;
         return this;
     }
 
@@ -207,6 +208,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 return readSnapshot == null ? null : readSnapshot.id();
             }
 
+            @Override
+            public ScanMode scanMode() {
+                return scanMode;
+            }
+
             @Override
             public List<ManifestEntry> files() {
                 return files;
@@ -274,7 +280,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
-        switch (scanKind) {
+        switch (scanMode) {
             case ALL:
                 return snapshot.dataManifests(manifestList);
             case DELTA:
@@ -294,7 +300,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                                 "Incremental scan does not accept %s snapshot",
                                 snapshot.commitKind()));
             default:
-                throw new UnsupportedOperationException("Unknown scan kind " + 
scanKind.name());
+                throw new UnsupportedOperationException("Unknown scan kind " + 
scanMode.name());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 441b6eef7..9fce20710 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
@@ -55,7 +56,7 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
 
-    FileStoreScan withKind(ScanKind scanKind);
+    FileStoreScan withKind(ScanMode scanMode);
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
@@ -77,6 +78,8 @@ public interface FileStoreScan {
         @Nullable
         Long snapshotId();
 
+        ScanMode scanMode();
+
         /** Result {@link ManifestEntry} files. */
         List<ManifestEntry> files();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index 6c113f2df..d1355f1bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -38,10 +38,9 @@ public class DataFilePlan implements TableScan.Plan {
         return new ArrayList<>(splits);
     }
 
-    public static DataFilePlan fromResult(StartingScanner.Result result) {
-        return new DataFilePlan(
-                result instanceof StartingScanner.ScannedResult
-                        ? ((StartingScanner.ScannedResult) result).splits()
-                        : Collections.emptyList());
+    public static TableScan.Plan fromResult(StartingScanner.Result result) {
+        return result instanceof StartingScanner.ScannedResult
+                ? ((StartingScanner.ScannedResult) result).plan()
+                : new DataFilePlan(Collections.emptyList());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 3cf3d9b10..eb8566b65 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -55,7 +55,6 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     private FollowUpScanner followUpScanner;
     private BoundedChecker boundedChecker;
     private boolean isFullPhaseEnd = false;
-    @Nullable private Long currentWatermark;
     @Nullable private Long nextSnapshotId;
 
     public InnerStreamTableScanImpl(
@@ -78,7 +77,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     }
 
     @Override
-    public Plan plan() {
+    public RichPlan plan() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(true);
         }
@@ -96,16 +95,15 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         }
     }
 
-    private Plan tryFirstPlan() {
+    private RichPlan tryFirstPlan() {
         StartingScanner.Result result = startingScanner.scan(snapshotManager, 
snapshotReader);
         if (result instanceof ScannedResult) {
             ScannedResult scannedResult = (ScannedResult) result;
-            currentWatermark = scannedResult.currentWatermark();
             long currentSnapshotId = scannedResult.currentSnapshotId();
             nextSnapshotId = currentSnapshotId + 1;
             isFullPhaseEnd =
                     
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
-            return DataFilePlan.fromResult(result);
+            return scannedResult.plan();
         } else if (result instanceof StartingScanner.NextSnapshot) {
             nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
             isFullPhaseEnd =
@@ -116,7 +114,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         return SnapshotNotExistPlan.INSTANCE;
     }
 
-    private Plan nextPlan() {
+    private RichPlan nextPlan() {
         while (true) {
             if (isFullPhaseEnd) {
                 throw new EndOfScanException();
@@ -150,13 +148,11 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
                 LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
                 SnapshotReader.Plan overwritePlan =
                         
followUpScanner.getOverwriteChangesPlan(nextSnapshotId, snapshotReader);
-                currentWatermark = overwritePlan.watermark();
                 nextSnapshotId++;
                 return overwritePlan;
             } else if (followUpScanner.shouldScanSnapshot(snapshot)) {
                 LOG.debug("Find snapshot id {}.", nextSnapshotId);
                 SnapshotReader.Plan plan = 
followUpScanner.scan(nextSnapshotId, snapshotReader);
-                currentWatermark = plan.watermark();
                 nextSnapshotId++;
                 return plan;
             } else {
@@ -218,15 +214,15 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         return nextSnapshotId;
     }
 
-    @Nullable
     @Override
-    public Long watermark() {
-        return currentWatermark;
+    public void restore(@Nullable Long nextSnapshotId) {
+        this.nextSnapshotId = nextSnapshotId;
     }
 
     @Override
-    public void restore(@Nullable Long nextSnapshotId) {
-        this.nextSnapshotId = nextSnapshotId;
+    public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
+        restore(nextSnapshotId);
+        snapshotReader.withMode(scanMode);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index ea19a1ab9..dc01bd429 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -52,7 +52,7 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
     }
 
     @Override
-    public DataFilePlan plan() {
+    public TableScan.Plan plan() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(false);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
 b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
similarity index 63%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
copy to paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
index df58c8067..e592f4648 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
@@ -18,19 +18,30 @@
 
 package org.apache.paimon.table.source;
 
-import java.util.Collections;
-import java.util.List;
+import org.apache.paimon.annotation.Public;
 
-/** This is used to distinguish the case where the snapshot does not exist and 
the plan is empty. */
-public class SnapshotNotExistPlan implements TableScan.Plan {
-    public static final SnapshotNotExistPlan INSTANCE = new 
SnapshotNotExistPlan();
+import javax.annotation.Nullable;
 
-    private SnapshotNotExistPlan() {
-        // private
-    }
+/**
+ * Rich Plan of scan.
+ *
+ * @since 0.6.0
+ */
+@Public
+public interface RichPlan extends TableScan.Plan {
+
+    /** Current watermark for consumed snapshot. */
+    @Nullable
+    Long watermark();
+
+    /**
+     * Snapshot id of this plan.
+     *
+     * @return null if the table is empty.
+     */
+    @Nullable
+    Long snapshotId();
 
-    @Override
-    public List<Split> splits() {
-        return Collections.emptyList();
-    }
+    /** Scan which part of the snapshot. */
+    ScanMode scanMode();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
similarity index 84%
rename from paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java
rename to paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
index 7d0818aaa..10bc47c83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
@@ -16,14 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Scan which part of the snapshot.
+ *
+ * @since 0.6.0
+ */
+@Public
+public enum ScanMode {
 
-/** Scan which part of the snapshot. */
-public enum ScanKind {
     /** Scan complete data files of a snapshot. */
     ALL,
+
     /** Only scan newly changed files of a snapshot. */
     DELTA,
+
     /** Only scan changelog files of a snapshot. */
     CHANGELOG
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
index df58c8067..358a6ce7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.table.source;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 
 /** This is used to distinguish the case where the snapshot does not exist and 
the plan is empty. */
-public class SnapshotNotExistPlan implements TableScan.Plan {
+public class SnapshotNotExistPlan implements RichPlan {
     public static final SnapshotNotExistPlan INSTANCE = new 
SnapshotNotExistPlan();
 
     private SnapshotNotExistPlan() {
@@ -33,4 +35,21 @@ public class SnapshotNotExistPlan implements TableScan.Plan {
     public List<Split> splits() {
         return Collections.emptyList();
     }
+
+    @Nullable
+    @Override
+    public Long watermark() {
+        return null;
+    }
+
+    @Nullable
+    @Override
+    public Long snapshotId() {
+        return null;
+    }
+
+    @Override
+    public ScanMode scanMode() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index c0a900bcf..869937137 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -33,14 +33,16 @@ import javax.annotation.Nullable;
 @Public
 public interface StreamTableScan extends TableScan, Restorable<Long> {
 
-    /** Current watermark for consumed snapshot. */
-    @Nullable
-    Long watermark();
+    @Override
+    RichPlan plan();
 
     /** Restore from checkpoint next snapshot id. */
     @Override
     void restore(@Nullable Long nextSnapshotId);
 
+    /** Restore from checkpoint next snapshot id with scan kind. */
+    void restore(@Nullable Long nextSnapshotId, ScanMode scanMode);
+
     /** Checkpoint to return next snapshot id. */
     @Nullable
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index 0f89c5b42..a1d9d90a0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -49,7 +49,7 @@ public class CompactedStartingScanner implements 
StartingScanner {
         }
 
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index 05a23d1be..b74052822 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +49,6 @@ public class CompactionChangelogFollowUpScanner implements 
FollowUpScanner {
 
     @Override
     public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).read();
+        return 
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshotId).read();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
index 7824c75f0..342015612 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +49,6 @@ public class ContinuousAppendAndCompactFollowUpScanner 
implements FollowUpScanne
 
     @Override
     public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index caaf40ddf..7564d5372 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class ContinuousCompactorFollowUpScanner implements 
FollowUpScanner {
 
     @Override
     public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index d524b7883..0a1a2c195 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 /**
@@ -41,6 +41,6 @@ public class ContinuousFromSnapshotFullStartingScanner 
implements StartingScanne
         }
         long ceiledSnapshotId = Math.max(snapshotId, earliestSnapshotId);
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(ceiledSnapshotId).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index edcbada93..48d8d03fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
 
     @Override
     public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index b331f2a21..a03a707be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -38,6 +38,6 @@ public class FullStartingScanner implements StartingScanner {
             return new NoSnapshot();
         }
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index e3ba10691..fbdf8a858 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -22,8 +22,9 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -77,7 +78,29 @@ public class IncrementalStartingScanner implements 
StartingScanner {
             }
         }
 
-        return new ScannedResult(end, null, result);
+        return StartingScanner.fromPlan(
+                new SnapshotReader.Plan() {
+                    @Override
+                    public Long watermark() {
+                        return null;
+                    }
+
+                    @Override
+                    public Long snapshotId() {
+                        return end;
+                    }
+
+                    @Override
+                    public ScanMode scanMode() {
+                        // TODO introduce a new mode
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public List<Split> splits() {
+                        return (List) result;
+                    }
+                });
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -86,6 +109,6 @@ public class IncrementalStartingScanner implements 
StartingScanner {
             // ignore COMPACT and OVERWRITE
             return Collections.emptyList();
         }
-        return (List) 
reader.withSnapshot(s).withKind(ScanKind.DELTA).read().splits();
+        return (List) 
reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index 4b188f6fe..adb49c4d0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class InputChangelogFollowUpScanner implements 
FollowUpScanner {
 
     @Override
     public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).read();
+        return 
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshotId).read();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 0cee60086..7cba985f8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,17 +21,15 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 
 /** Read splits from specified {@link Snapshot} with given configuration. */
@@ -49,7 +47,7 @@ public interface SnapshotReader {
 
     SnapshotReader withFilter(Predicate predicate);
 
-    SnapshotReader withKind(ScanKind scanKind);
+    SnapshotReader withMode(ScanMode scanMode);
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
@@ -69,17 +67,7 @@ public interface SnapshotReader {
     List<BinaryRow> partitions();
 
     /** Result plan of this scan. */
-    interface Plan extends TableScan.Plan {
-
-        @Nullable
-        Long watermark();
-
-        /**
-         * Snapshot id of this plan, return null if the table is empty or the 
manifest list is
-         * specified.
-         */
-        @Nullable
-        Long snapshotId();
+    interface Plan extends RichPlan {
 
         /** Result splits. */
         List<Split> splits();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 99d2b7291..b335524d5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -30,11 +30,11 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.utils.Filter;
@@ -69,7 +69,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final BiConsumer<FileStoreScan, Predicate> 
nonPartitionFilterConsumer;
     private final DefaultValueAssigner defaultValueAssigner;
 
-    private ScanKind scanKind = ScanKind.ALL;
+    private ScanMode scanMode = ScanMode.ALL;
     private RecordComparator lazyPartitionComparator;
 
     public SnapshotReaderImpl(
@@ -149,9 +149,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
     }
 
     @Override
-    public SnapshotReader withKind(ScanKind scanKind) {
-        this.scanKind = scanKind;
-        scan.withKind(scanKind);
+    public SnapshotReader withMode(ScanMode scanMode) {
+        this.scanMode = scanMode;
+        scan.withKind(scanMode);
         return this;
     }
 
@@ -191,7 +191,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         List<DataSplit> splits =
                 generateSplits(
                         snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : 
snapshotId,
-                        scanKind != ScanKind.ALL,
+                        scanMode != ScanMode.ALL,
                         splitGenerator,
                         files);
         return new Plan() {
@@ -207,6 +207,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 return plan.snapshotId();
             }
 
+            @Override
+            public ScanMode scanMode() {
+                return plan.scanMode();
+            }
+
             @Override
             public List<Split> splits() {
                 return (List) splits;
@@ -234,7 +239,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
     /** Get splits from an overwritten snapshot files. */
     @Override
     public Plan readOverwrittenChanges() {
-        withKind(ScanKind.DELTA);
+        withMode(ScanMode.DELTA);
         FileStoreScan.Plan plan = scan.plan();
         long snapshotId = plan.snapshotId();
 
@@ -309,6 +314,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 return plan.snapshotId();
             }
 
+            @Override
+            public ScanMode scanMode() {
+                // TODO introduce a new mode
+                throw new UnsupportedOperationException();
+            }
+
             @Override
             public List<Split> splits() {
                 return (List) splits;
@@ -318,7 +329,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
     @Override
     public Plan readIncrementalDiff(Snapshot before) {
-        withKind(ScanKind.ALL);
+        withMode(ScanMode.ALL);
         FileStoreScan.Plan plan = scan.plan();
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 807930be7..af44a8b8a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -22,8 +22,6 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 
 /** Helper class for the first planning of {@link TableScan}. */
@@ -38,33 +36,28 @@ public interface StartingScanner {
     class NoSnapshot implements Result {}
 
     static ScannedResult fromPlan(SnapshotReader.Plan plan) {
-        return new ScannedResult(plan.snapshotId(), plan.watermark(), (List) 
plan.splits());
+        return new ScannedResult(plan);
     }
 
     /** Result with scanned snapshot. Next snapshot should be the current 
snapshot plus 1. */
     class ScannedResult implements Result {
-        private final long currentSnapshotId;
-        @Nullable private final Long currentWatermark;
-        private final List<DataSplit> splits;
-
-        public ScannedResult(
-                long currentSnapshotId, @Nullable Long currentWatermark, 
List<DataSplit> splits) {
-            this.currentSnapshotId = currentSnapshotId;
-            this.currentWatermark = currentWatermark;
-            this.splits = splits;
+
+        private final SnapshotReader.Plan plan;
+
+        public ScannedResult(SnapshotReader.Plan plan) {
+            this.plan = plan;
         }
 
         public long currentSnapshotId() {
-            return currentSnapshotId;
+            return plan.snapshotId();
         }
 
-        @Nullable
-        public Long currentWatermark() {
-            return currentWatermark;
+        public List<DataSplit> splits() {
+            return (List) plan.splits();
         }
 
-        public List<DataSplit> splits() {
-            return splits;
+        public SnapshotReader.Plan plan() {
+            return plan;
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index cc4f2907b..1157e22d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 /**
@@ -40,6 +40,6 @@ public class StaticFromSnapshotStartingScanner implements 
StartingScanner {
             return new NoSnapshot();
         }
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index 13c5e95cd..b402851a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -40,6 +40,6 @@ public class StaticFromTagStartingScanner implements 
StartingScanner {
         Snapshot snapshot = tagManager.taggedSnapshot(tagName);
 
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshot).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 34b0f0974..315b749df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -53,7 +53,7 @@ public class StaticFromTimestampStartingScanner implements 
StartingScanner {
             return new NoSnapshot();
         }
         return StartingScanner.fromPlan(
-                
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshot.id()).read());
+                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshot.id()).read());
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index ba8a4b4cc..e8e90df1b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,7 +27,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -40,6 +39,8 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableRead;
@@ -225,8 +226,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
-        public SnapshotReader withKind(ScanKind scanKind) {
-            snapshotReader.withKind(scanKind);
+        public SnapshotReader withMode(ScanMode scanMode) {
+            snapshotReader.withMode(scanMode);
             return this;
         }
 
@@ -307,7 +308,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public Plan plan() {
+        public RichPlan plan() {
             return streamScan.plan();
         }
 
@@ -322,15 +323,14 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return streamScan.checkpoint();
         }
 
-        @Nullable
         @Override
-        public Long watermark() {
-            return streamScan.watermark();
+        public void restore(@Nullable Long nextSnapshotId) {
+            streamScan.restore(nextSnapshotId);
         }
 
         @Override
-        public void restore(@Nullable Long nextSnapshotId) {
-            streamScan.restore(nextSnapshotId);
+        public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
+            streamScan.restore(nextSnapshotId, scanMode);
         }
 
         @Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 9b5441fd5..f545f7a28 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -39,7 +39,6 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
@@ -47,6 +46,7 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -339,8 +339,8 @@ public class TestFileStore extends KeyValueFileStore {
                             .withKind(
                                     options.changelogProducer()
                                                     == 
CoreOptions.ChangelogProducer.NONE
-                                            ? ScanKind.DELTA
-                                            : ScanKind.CHANGELOG)
+                                            ? ScanMode.DELTA
+                                            : ScanMode.CHANGELOG)
                             .withSnapshot(snapshotId)
                             .plan()
                             .files();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 2272b7af3..5c99df4aa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -38,6 +37,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
@@ -213,7 +213,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -231,7 +231,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         List<Split> splits =
                 toSplits(
                         table.newSnapshotReader()
-                                .withKind(ScanKind.DELTA)
+                                .withMode(ScanMode.DELTA)
                                 .withFilter(predicate)
                                 .read()
                                 .dataSplits());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
index 59bec0dec..6fd275083 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -34,6 +33,7 @@ import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowKind;
@@ -109,7 +109,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -130,7 +130,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -148,7 +148,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         List<Split> splits =
                 toSplits(
                         table.newSnapshotReader()
-                                .withKind(ScanKind.DELTA)
+                                .withMode(ScanMode.DELTA)
                                 .withFilter(predicate)
                                 .read()
                                 .dataSplits());
@@ -203,7 +203,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
 
         // check that no data file is produced
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         assertThat(splits).isEmpty();
         // check that no changelog file is produced
         Path bucketPath = DataFilePathFactory.bucketPath(table.location(), 
"1", 0);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
index b10fadf61..565262f39 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
@@ -20,10 +20,10 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.WriteMode;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
@@ -153,7 +153,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
                     // filter with "b" = 15 in schema0
@@ -174,7 +174,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
 
@@ -212,7 +212,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
                     // filter with "kt" = 116 in schema0
@@ -230,7 +230,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index ce901586c..278305c38 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -28,7 +28,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -48,6 +47,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
@@ -253,7 +253,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -273,7 +273,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits =
-                
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+                
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
         TableRead read = table.newRead().withProjection(PROJECTION);
 
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -292,7 +292,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         List<Split> splits =
                 toSplits(
                         table.newSnapshotReader()
-                                .withKind(ScanKind.DELTA)
+                                .withMode(ScanMode.DELTA)
                                 .withFilter(predicate)
                                 .read()
                                 .dataSplits());
@@ -328,7 +328,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         List<Split> splits =
                 toSplits(
-                        
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+                        
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
@@ -377,7 +377,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         List<Split> splits =
                 toSplits(
-                        
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+                        
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
@@ -400,7 +400,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         splits =
                 toSplits(
-                        
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+                        
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder("+I 
1|30|130|binary|varbinary|mapKey:mapVal|multiset");
         assertThat(getResult(read, splits, binaryRow(2), 0, 
CHANGELOG_ROW_TO_STRING))
@@ -426,7 +426,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         splits =
                 toSplits(
-                        
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+                        
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
                         "-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
@@ -709,7 +709,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 20, 200L));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        SnapshotReader snapshotReader = 
table.newSnapshotReader().withKind(ScanKind.DELTA);
+        SnapshotReader snapshotReader = 
table.newSnapshotReader().withMode(ScanMode.DELTA);
         List<DataSplit> splits0 = snapshotReader.read().dataSplits();
         assertThat(splits0).hasSize(1);
         assertThat(splits0.get(0).dataFiles()).hasSize(1);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
index a68f2594c..039a968cb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
@@ -19,12 +19,12 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.IsNull;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataTypes;
@@ -341,7 +341,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
                     TableRead read = table.newRead();
@@ -359,7 +359,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
 
@@ -384,7 +384,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
                     // project "c", "b", "pt" in schema0
@@ -400,7 +400,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
 
@@ -424,7 +424,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
                     // filter with "b" = 15 in schema0
@@ -442,7 +442,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
-                                            .withKind(ScanKind.DELTA)
+                                            .withMode(ScanMode.DELTA)
                                             .read()
                                             .dataSplits());
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 724d255bb..9ce46e77b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
@@ -76,13 +75,13 @@ public class StartupModeTest extends ScannerTestBase {
         writeAndCommit(4, rowData(1, 10, 103L));
         TableScan.Plan thirdPlan = dataTableScan.plan();
         assertThat(thirdPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -96,20 +95,20 @@ public class StartupModeTest extends ScannerTestBase {
         TableScan.Plan secondPlan = dataTableScan.plan();
 
         assertThat(firstPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.ALL).read().splits());
         assertThat(secondPlan.splits()).isEmpty();
 
         // write next data
         writeAndCommit(4, rowData(1, 10, 103L));
         TableScan.Plan thirdPlan = dataTableScan.plan();
         assertThat(thirdPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -135,13 +134,13 @@ public class StartupModeTest extends ScannerTestBase {
 
         assertThat(firstPlan.splits()).isEmpty();
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = readTable.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -159,15 +158,15 @@ public class StartupModeTest extends ScannerTestBase {
         TableScan.Plan secondPlan = dataTableScan.plan();
 
         assertThat(firstPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(5).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(5).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -184,13 +183,13 @@ public class StartupModeTest extends ScannerTestBase {
 
         assertThat(firstPlan.splits()).isEmpty();
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -212,7 +211,7 @@ public class StartupModeTest extends ScannerTestBase {
         assertThat(firstPlan.splits()).isEmpty();
         // ceiled up to the earliest snapshot id = 2
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
@@ -232,15 +231,15 @@ public class StartupModeTest extends ScannerTestBase {
         TableScan.Plan secondPlan = dataTableScan.plan();
 
         assertThat(firstPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
         TableScan.Plan plan = batchScan.plan();
         assertThat(plan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
     }
 
     @Test
@@ -260,9 +259,9 @@ public class StartupModeTest extends ScannerTestBase {
 
         // ceiled up to the earliest snapshot id = 2
         assertThat(firstPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
         assertThat(secondPlan.splits())
-                
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.DELTA).read().splits());
+                
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.DELTA).read().splits());
 
         // batch mode
         TableScan batchScan = table.newScan();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 30ac622ce..2905d8a30 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.RichPlan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 
@@ -180,12 +181,13 @@ public class MonitorFunction extends 
RichSourceFunction<Split>
                     return;
                 }
                 try {
-                    List<Split> splits = scan.plan().splits();
+                    RichPlan plan = scan.plan();
+                    List<Split> splits = plan.splits();
                     isEmpty = splits.isEmpty();
                     splits.forEach(ctx::collect);
 
                     if (emitSnapshotWatermark) {
-                        Long watermark = scan.watermark();
+                        Long watermark = plan.watermark();
                         if (watermark != null) {
                             ctx.emitWatermark(new Watermark(watermark));
                         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index d02cd2b89..4d51c0f89 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,12 +21,13 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
-import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -200,7 +201,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -216,7 +217,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -269,7 +270,7 @@ public class ContinuousFileSplitEnumeratorTest {
                 new TestingSplitEnumeratorContext<>(3);
         context.registerReader(0, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -284,7 +285,7 @@ public class ContinuousFileSplitEnumeratorTest {
         long snapshot = 0;
         List<DataSplit> splits = new ArrayList<>();
         splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -296,7 +297,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         splits.clear();
         splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
-        results.put(2L, new DataFilePlan(splits));
+        results.put(2L, new MockPlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -315,7 +316,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -332,7 +333,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -378,7 +379,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -407,7 +408,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
         // trigger assign task 0 and task 1 will get their assignment
         context.triggerAllActions();
 
@@ -436,7 +437,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -453,7 +454,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -477,7 +478,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -495,7 +496,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
 
         context.registeredReaders().remove(1);
         // assign to task 0
@@ -510,7 +511,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         scan.allowEnd(false);
         ContinuousFileSplitEnumerator enumerator =
@@ -527,7 +528,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
 
         // request directly
         enumerator.handleSplitRequest(0, "test-host");
@@ -555,7 +556,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         scan.allowEnd(false);
         ContinuousFileSplitEnumerator enumerator =
@@ -574,7 +575,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 2; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new DataFilePlan(splits));
+        results.put(1L, new MockPlan(splits));
 
         // will not trigger scan here
         enumerator.handleSplitRequest(0, "test-host");
@@ -602,7 +603,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 2; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(2L, new DataFilePlan(splits));
+        results.put(2L, new MockPlan(splits));
         // because blockScanByRequest = false, so this request will trigger 
scan
         enumerator.handleSplitRequest(2, "test-host");
         context.getExecutorService().triggerAllNonPeriodicTasks();
@@ -620,7 +621,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.getExecutorService().triggerAllNonPeriodicTasks();
         splits.clear();
         splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
-        results.put(3L, new DataFilePlan(splits));
+        results.put(3L, new MockPlan(splits));
 
         // this won't trigger scan, cause blockScanByRequest = true
         enumerator.handleSplitRequest(3, "test-host");
@@ -647,13 +648,13 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
 
         // prepare test data
-        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        TreeMap<Long, RichPlan> results = new TreeMap<>();
         Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
         StreamTableScan scan = new MockScan(results);
         for (int i = 1; i <= 4; i++) {
             List<DataSplit> dataSplits =
                     Collections.singletonList(createDataSplit(i, 0, 
Collections.emptyList()));
-            results.put((long) i, new DataFilePlan(dataSplits));
+            results.put((long) i, new MockPlan(dataSplits));
             expectedResults.put((long) i, dataSplits);
         }
 
@@ -787,20 +788,51 @@ public class ContinuousFileSplitEnumeratorTest {
         }
     }
 
+    private static class MockPlan implements RichPlan {
+
+        private final List<DataSplit> splits;
+
+        public MockPlan(List<DataSplit> splits) {
+            this.splits = splits;
+        }
+
+        @Nullable
+        @Override
+        public Long watermark() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Nullable
+        @Override
+        public Long snapshotId() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ScanMode scanMode() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<Split> splits() {
+            return (List) splits;
+        }
+    }
+
     private static class MockScan implements StreamTableScan {
 
-        private final TreeMap<Long, Plan> results;
+        private final TreeMap<Long, RichPlan> results;
         private @Nullable Long nextSnapshotId;
         private boolean allowEnd = true;
 
-        public MockScan(TreeMap<Long, Plan> results) {
+        public MockScan(TreeMap<Long, RichPlan> results) {
             this.results = results;
             this.nextSnapshotId = null;
         }
 
         @Override
-        public Plan plan() {
-            Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
+        public RichPlan plan() {
+            Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();
             if (planEntry == null) {
                 if (allowEnd) {
                     throw new EndOfScanException();
@@ -825,14 +857,11 @@ public class ContinuousFileSplitEnumeratorTest {
         @Override
         public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
 
-        @Nullable
         @Override
-        public Long watermark() {
-            return null;
-        }
+        public void restore(Long state) {}
 
         @Override
-        public void restore(Long state) {}
+        public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) 
{}
 
         public void allowEnd(boolean allowEnd) {
             this.allowEnd = allowEnd;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 2fb1f090a..880f3288e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 
@@ -59,6 +60,11 @@ public class FileStoreSourceSplitGeneratorTest {
                         return 1L;
                     }
 
+                    @Override
+                    public ScanMode scanMode() {
+                        return ScanMode.ALL;
+                    }
+
                     @Override
                     public List<ManifestEntry> files() {
                         return Arrays.asList(

Reply via email to