This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 23dad789 [FLINK-30207] Move split initialization and discovery logic 
fully into SnapshotEnumerator in Table Store
23dad789 is described below

commit 23dad789879735249651992ab9fe23b986ef1564
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 5 10:24:00 2022 +0800

    [FLINK-30207] Move split initialization and discovery logic fully into 
SnapshotEnumerator in Table Store
    
    This closes #411
---
 .../connector/lookup/FileStoreLookupFunction.java  |   6 +-
 .../source/ContinuousFileSplitEnumerator.java      |  38 ++-
 ...eSource.java => ContinuousFileStoreSource.java} |  60 ++---
 .../store/connector/source/FlinkSourceBuilder.java |  16 +-
 .../connector/source/PendingSplitsCheckpoint.java  |  10 +-
 .../source/PendingSplitsCheckpointSerializer.java  |  13 +-
 ...StoreSource.java => StaticFileStoreSource.java} |  43 +---
 .../source/StaticFileStoreSplitEnumerator.java     |   6 +-
 .../store/connector/source/TableStoreSource.java   |   9 +-
 .../table/store/connector/FileStoreITCase.java     |   8 +-
 .../PendingSplitsCheckpointSerializerTest.java     |   8 +-
 .../flink/table/store/file/KeyValueFileStore.java  |   1 -
 .../file/operation/AbstractFileStoreScan.java      |  42 +---
 .../file/operation/AppendOnlyFileStoreScan.java    |   2 -
 .../table/store/file/operation/FileStoreScan.java  |   2 +-
 .../file/operation/KeyValueFileStoreScan.java      |   3 -
 .../flink/table/store/file/operation/ScanKind.java |  29 +++
 .../table/store/table/source/DataTableScan.java    |  13 +-
 .../store/table/source/SnapshotEnumerator.java     | 150 ------------
 .../store/table/source/TableStreamingReader.java   |  43 ++--
 .../snapshot/DataFileSnapshotEnumerator.java       | 159 +++++++++++++
 .../source/snapshot/DeltaSnapshotEnumerator.java   |  66 ++++++
 .../FullCompactionChangelogSnapshotEnumerator.java |  72 ++++++
 .../snapshot/InputChangelogSnapshotEnumerator.java |  70 ++++++
 .../table/source/snapshot/SnapshotEnumerator.java  |  50 ++--
 .../flink/table/store/file/TestFileStore.java      |  11 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |   7 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   9 +-
 .../table/ChangelogWithKeyFileDataTableTest.java   |   9 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  39 ++--
 .../table/store/table/FileDataFilterTestBase.java  |  13 +-
 .../DataFileSnapshotEnumeratorTestBase.java        | 132 +++++++++++
 .../snapshot/DeltaSnapshotEnumeratorTest.java      | 234 +++++++++++++++++++
 ...lCompactionChangelogSnapshotEnumeratorTest.java | 256 +++++++++++++++++++++
 .../InputChangelogSnapshotEnumeratorTest.java      | 239 +++++++++++++++++++
 35 files changed, 1447 insertions(+), 421 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index 92e663e9..4f565fe0 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -156,7 +156,7 @@ public class FileStoreLookupFunction extends 
TableFunction<RowData> {
 
     /** Used by code generation. */
     @SuppressWarnings("unused")
-    public void eval(Object... values) throws IOException {
+    public void eval(Object... values) throws Exception {
         checkRefresh();
         List<RowData> results = lookupTable.get(GenericRowData.of(values));
         for (RowData matchedRow : results) {
@@ -164,7 +164,7 @@ public class FileStoreLookupFunction extends 
TableFunction<RowData> {
         }
     }
 
-    private void checkRefresh() throws IOException {
+    private void checkRefresh() throws Exception {
         if (nextLoadTime > System.currentTimeMillis()) {
             return;
         }
@@ -179,7 +179,7 @@ public class FileStoreLookupFunction extends 
TableFunction<RowData> {
         nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
     }
 
-    private void refresh() throws IOException {
+    private void refresh() throws Exception {
         while (true) {
             Iterator<RowData> batch = streamingReader.nextBatch();
             if (batch == null) {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index e2614bc7..2a6102a4 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -21,12 +21,9 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
-import 
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +41,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
-import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -58,6 +54,8 @@ public class ContinuousFileSplitEnumerator
 
     private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
 
+    private Long nextSnapshotId;
+
     private final long discoveryInterval;
 
     private final Set<Integer> readersAwaitingSplit;
@@ -66,26 +64,21 @@ public class ContinuousFileSplitEnumerator
 
     private final SnapshotEnumerator snapshotEnumerator;
 
-    private Long currentSnapshotId;
-
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
-            Path location,
-            DataTableScan scan,
-            CoreOptions.ChangelogProducer changelogProducer,
             Collection<FileStoreSourceSplit> remainSplits,
-            long currentSnapshotId,
-            long discoveryInterval) {
+            Long nextSnapshotId,
+            long discoveryInterval,
+            SnapshotEnumerator snapshotEnumerator) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.bucketSplits = new HashMap<>();
         addSplits(remainSplits);
-        this.currentSnapshotId = currentSnapshotId;
+        this.nextSnapshotId = nextSnapshotId;
         this.discoveryInterval = discoveryInterval;
         this.readersAwaitingSplit = new HashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
-        this.snapshotEnumerator =
-                new SnapshotEnumerator(location, scan, changelogProducer, 
currentSnapshotId);
+        this.snapshotEnumerator = snapshotEnumerator;
     }
 
     private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -101,10 +94,7 @@ public class ContinuousFileSplitEnumerator
     @Override
     public void start() {
         context.callAsync(
-                snapshotEnumerator,
-                this::processDiscoveredSplits,
-                discoveryInterval,
-                discoveryInterval);
+                snapshotEnumerator::enumerate, this::processDiscoveredSplits, 
0, discoveryInterval);
     }
 
     @Override
@@ -139,8 +129,7 @@ public class ContinuousFileSplitEnumerator
         List<FileStoreSourceSplit> splits = new ArrayList<>();
         bucketSplits.values().forEach(splits::addAll);
         final PendingSplitsCheckpoint checkpoint =
-                new PendingSplitsCheckpoint(
-                        splits, currentSnapshotId == null ? INVALID_SNAPSHOT : 
currentSnapshotId);
+                new PendingSplitsCheckpoint(splits, nextSnapshotId);
 
         LOG.debug("Source Checkpoint is {}", checkpoint);
         return checkpoint;
@@ -148,7 +137,8 @@ public class ContinuousFileSplitEnumerator
 
     // ------------------------------------------------------------------------
 
-    private void processDiscoveredSplits(@Nullable EnumeratorResult result, 
Throwable error) {
+    private void processDiscoveredSplits(
+            @Nullable DataTableScan.DataFilePlan result, Throwable error) {
         if (error != null) {
             LOG.error("Failed to enumerate files", error);
             return;
@@ -158,8 +148,8 @@ public class ContinuousFileSplitEnumerator
             return;
         }
 
-        currentSnapshotId = result.snapshotId;
-        addSplits(splitGenerator.createSplits(result.plan));
+        nextSnapshotId = result.snapshotId + 1;
+        addSplits(splitGenerator.createSplits(result));
         assignSplits();
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
similarity index 51%
copy from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
copy to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index 573e5f98..581d746c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -19,91 +19,63 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.snapshot.DataFileSnapshotEnumerator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
-import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** {@link Source} of file store. */
-public class FileStoreSource extends FlinkSource {
+/** Unbounded {@link FlinkSource} for reading records. It continuously 
monitors new snapshots. */
+public class ContinuousFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable table;
-
-    private final boolean isContinuous;
-
     private final long discoveryInterval;
 
-    public FileStoreSource(
+    public ContinuousFileStoreSource(
             FileStoreTable table,
-            boolean isContinuous,
             long discoveryInterval,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
         super(table, projectedFields, predicate, limit);
         this.table = table;
-        this.isContinuous = isContinuous;
         this.discoveryInterval = discoveryInterval;
     }
 
     @Override
     public Boundedness getBoundedness() {
-        return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED;
+        return Boundedness.CONTINUOUS_UNBOUNDED;
     }
 
     @Override
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
-        SnapshotManager snapshotManager = table.snapshotManager();
         DataTableScan scan = table.newScan();
         if (predicate != null) {
             scan.withFilter(predicate);
         }
 
-        Long snapshotId;
-        Collection<FileStoreSourceSplit> splits;
-        if (checkpoint == null) {
-            DataFilePlan plan = isContinuous ? 
SnapshotEnumerator.startup(scan) : scan.plan();
-            snapshotId = plan.snapshotId;
-            splits = new FileStoreSourceSplitGenerator().createSplits(plan);
-        } else {
-            // restore from checkpoint
-            snapshotId = checkpoint.currentSnapshotId();
-            if (snapshotId == INVALID_SNAPSHOT) {
-                snapshotId = null;
-            }
+        Long nextSnapshotId = null;
+        Collection<FileStoreSourceSplit> splits = new ArrayList<>();
+        if (checkpoint != null) {
+            nextSnapshotId = checkpoint.currentSnapshotId();
             splits = checkpoint.splits();
         }
 
-        // create enumerator from snapshotId and splits
-        if (isContinuous) {
-            long currentSnapshot = snapshotId == null ? 
Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
-            return new ContinuousFileSplitEnumerator(
-                    context,
-                    table.location(),
-                    scan.withIncremental(true), // the subsequent planning is 
all incremental
-                    table.options().changelogProducer(),
-                    splits,
-                    currentSnapshot,
-                    discoveryInterval);
-        } else {
-            Snapshot snapshot = snapshotId == null ? null : 
snapshotManager.snapshot(snapshotId);
-            return new StaticFileStoreSplitEnumerator(context, snapshot, 
splits);
-        }
+        return new ContinuousFileSplitEnumerator(
+                context,
+                splits,
+                nextSnapshotId,
+                discoveryInterval,
+                DataFileSnapshotEnumerator.create(table, scan, 
nextSnapshotId));
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index d5197542..57b544e4 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -117,9 +117,13 @@ public class FlinkSourceBuilder {
         return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
     }
 
-    private FileStoreSource buildFileSource(boolean isContinuous) {
-        return new FileStoreSource(
-                table, isContinuous, discoveryIntervalMills(), 
projectedFields, predicate, limit);
+    private StaticFileStoreSource buildStaticFileSource() {
+        return new StaticFileStoreSource(table, projectedFields, predicate, 
limit);
+    }
+
+    private ContinuousFileStoreSource buildContinuousFileSource() {
+        return new ContinuousFileStoreSource(
+                table, discoveryIntervalMills(), projectedFields, predicate, 
limit);
     }
 
     private Source<RowData, ?, ?> buildSource() {
@@ -144,20 +148,20 @@ public class FlinkSourceBuilder {
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true);
+                return buildContinuousFileSource();
             } else {
                 if (startupMode != LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
-                                buildFileSource(false))
+                                buildStaticFileSource())
                         .addSource(
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
                         .build();
             }
         } else {
-            return buildFileSource(false);
+            return buildStaticFileSource();
         }
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
index 10c2b23e..789a1a77 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.store.connector.source;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 
 /**
@@ -26,15 +28,13 @@ import java.util.Collection;
  */
 public class PendingSplitsCheckpoint {
 
-    public static final long INVALID_SNAPSHOT = -1L;
-
     /** The splits in the checkpoint. */
     private final Collection<FileStoreSourceSplit> splits;
 
-    private final long currentSnapshotId;
+    private final @Nullable Long currentSnapshotId;
 
     public PendingSplitsCheckpoint(
-            Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
+            Collection<FileStoreSourceSplit> splits, @Nullable Long 
currentSnapshotId) {
         this.splits = splits;
         this.currentSnapshotId = currentSnapshotId;
     }
@@ -43,7 +43,7 @@ public class PendingSplitsCheckpoint {
         return splits;
     }
 
-    public long currentSnapshotId() {
+    public @Nullable Long currentSnapshotId() {
         return currentSnapshotId;
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
index b3df0aca..2f129ea2 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
@@ -31,6 +31,8 @@ import java.util.List;
 public class PendingSplitsCheckpointSerializer
         implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
 
+    private static final long INVALID_SNAPSHOT = -1;
+
     private final FileStoreSourceSplitSerializer splitSerializer;
 
     public PendingSplitsCheckpointSerializer(FileStoreSourceSplitSerializer 
splitSerializer) {
@@ -46,19 +48,24 @@ public class PendingSplitsCheckpointSerializer
     public byte[] serialize(PendingSplitsCheckpoint pendingSplitsCheckpoint) 
throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+
         view.writeInt(pendingSplitsCheckpoint.splits().size());
         for (FileStoreSourceSplit split : pendingSplitsCheckpoint.splits()) {
             byte[] bytes = splitSerializer.serialize(split);
             view.writeInt(bytes.length);
             view.write(bytes);
         }
-        view.writeLong(pendingSplitsCheckpoint.currentSnapshotId());
+
+        Long currentSnapshotId = pendingSplitsCheckpoint.currentSnapshotId();
+        view.writeLong(currentSnapshotId == null ? INVALID_SNAPSHOT : 
currentSnapshotId);
+
         return out.toByteArray();
     }
 
     @Override
     public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) 
throws IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
+
         int splitNumber = view.readInt();
         List<FileStoreSourceSplit> splits = new ArrayList<>(splitNumber);
         for (int i = 0; i < splitNumber; i++) {
@@ -67,7 +74,9 @@ public class PendingSplitsCheckpointSerializer
             view.readFully(bytes);
             splits.add(splitSerializer.deserialize(version, bytes));
         }
+
         long currentSnapshotId = view.readLong();
-        return new PendingSplitsCheckpoint(splits, currentSnapshotId);
+        return new PendingSplitsCheckpoint(
+                splits, currentSnapshotId == INVALID_SNAPSHOT ? null : 
currentSnapshotId);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
similarity index 60%
rename from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
rename to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index 573e5f98..2223bbad 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.Snapshot;
@@ -27,42 +26,30 @@ import 
org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 
 import javax.annotation.Nullable;
 
 import java.util.Collection;
 
-import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** {@link Source} of file store. */
-public class FileStoreSource extends FlinkSource {
+/** Bounded {@link FlinkSource} for reading records. It does not monitor new 
snapshots. */
+public class StaticFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable table;
 
-    private final boolean isContinuous;
-
-    private final long discoveryInterval;
-
-    public FileStoreSource(
+    public StaticFileStoreSource(
             FileStoreTable table,
-            boolean isContinuous,
-            long discoveryInterval,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
         super(table, projectedFields, predicate, limit);
         this.table = table;
-        this.isContinuous = isContinuous;
-        this.discoveryInterval = discoveryInterval;
     }
 
     @Override
     public Boundedness getBoundedness() {
-        return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED;
+        return Boundedness.BOUNDED;
     }
 
     @Override
@@ -78,32 +65,16 @@ public class FileStoreSource extends FlinkSource {
         Long snapshotId;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            DataFilePlan plan = isContinuous ? 
SnapshotEnumerator.startup(scan) : scan.plan();
+            DataTableScan.DataFilePlan plan = scan.plan();
             snapshotId = plan.snapshotId;
             splits = new FileStoreSourceSplitGenerator().createSplits(plan);
         } else {
             // restore from checkpoint
             snapshotId = checkpoint.currentSnapshotId();
-            if (snapshotId == INVALID_SNAPSHOT) {
-                snapshotId = null;
-            }
             splits = checkpoint.splits();
         }
 
-        // create enumerator from snapshotId and splits
-        if (isContinuous) {
-            long currentSnapshot = snapshotId == null ? 
Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
-            return new ContinuousFileSplitEnumerator(
-                    context,
-                    table.location(),
-                    scan.withIncremental(true), // the subsequent planning is 
all incremental
-                    table.options().changelogProducer(),
-                    splits,
-                    currentSnapshot,
-                    discoveryInterval);
-        } else {
-            Snapshot snapshot = snapshotId == null ? null : 
snapshotManager.snapshot(snapshotId);
-            return new StaticFileStoreSplitEnumerator(context, snapshot, 
splits);
-        }
+        Snapshot snapshot = snapshotId == null ? null : 
snapshotManager.snapshot(snapshotId);
+        return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 599417f1..1712ebde 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -30,9 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
-import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** A SplitEnumerator implementation for bounded / batch {@link 
FileStoreSource} input. */
+/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} 
input. */
 public class StaticFileStoreSplitEnumerator
         implements SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
 
@@ -84,7 +82,7 @@ public class StaticFileStoreSplitEnumerator
     @Override
     public PendingSplitsCheckpoint snapshotState(long checkpointId) {
         return new PendingSplitsCheckpoint(
-                new ArrayList<>(splits), snapshot == null ? INVALID_SNAPSHOT : 
snapshot.id());
+                new ArrayList<>(splits), snapshot == null ? null : 
snapshot.id());
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 503f1009..77d7b460 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -53,10 +53,11 @@ import static 
org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
 import static 
org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
 
 /**
- * Table source to create {@link FileStoreSource} under batch mode or 
change-tracking is disabled.
- * For streaming mode with change-tracking enabled and FULL scan mode, it will 
create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link 
FileStoreSource} and kafka
- * log source created by {@link LogSourceProvider}.
+ * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
+ * batch mode or change-tracking is disabled. For streaming mode with 
change-tracking enabled and
+ * FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link 
StaticFileStoreSource} and
+ * kafka log source created by {@link LogSourceProvider}.
  */
 public class TableStoreSource extends FlinkTableSource
         implements LookupTableSource, SupportsWatermarkPushDown {
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 5b749d74..b4e8bb72 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -35,8 +35,9 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
 import org.apache.flink.table.store.connector.sink.StoreSink;
-import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.connector.source.ContinuousFileStoreSource;
 import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
+import org.apache.flink.table.store.connector.source.StaticFileStoreSource;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
@@ -77,7 +78,10 @@ import static org.apache.flink.table.store.CoreOptions.PATH;
 import static 
org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
+/**
+ * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
+ * StoreSink}.
+ */
 @RunWith(Parameterized.class)
 public class FileStoreITCase extends AbstractTestBase {
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 0a5a3862..7548cb61 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -37,7 +37,7 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void serializeEmptyCheckpoint() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                new PendingSplitsCheckpoint(Collections.emptyList(), 5);
+                new PendingSplitsCheckpoint(Collections.emptyList(), 5L);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
 
@@ -48,7 +48,7 @@ public class PendingSplitsCheckpointSerializerTest {
     public void serializeSomeSplits() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
                 new PendingSplitsCheckpoint(
-                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 3);
+                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 3L);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
 
@@ -59,7 +59,7 @@ public class PendingSplitsCheckpointSerializerTest {
     public void serializeSplitsAndContinuous() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
                 new PendingSplitsCheckpoint(
-                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 20);
+                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 20L);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
 
@@ -69,7 +69,7 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void repeatedSerialization() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                new PendingSplitsCheckpoint(Arrays.asList(testSplit3(), 
testSplit1()), 5);
+                new PendingSplitsCheckpoint(Arrays.asList(testSplit3(), 
testSplit1()), 5L);
 
         serializeAndDeserialize(checkpoint);
         serializeAndDeserialize(checkpoint);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 5332215f..802c1944 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -112,7 +112,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 manifestListFactory(),
                 options.bucket(),
                 checkNumOfBuckets,
-                options.changelogProducer(),
                 options.readCompacted());
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index b98e7211..e7e563e0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
@@ -61,7 +60,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final ManifestList manifestList;
     private final int numOfBuckets;
     private final boolean checkNumOfBuckets;
-    private final CoreOptions.ChangelogProducer changelogProducer;
     private final boolean readCompacted;
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
@@ -74,7 +72,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Long specifiedSnapshotId = null;
     private Integer specifiedBucket = null;
     private List<ManifestFileMeta> specifiedManifests = null;
-    private boolean isIncremental = false;
+    private ScanKind scanKind = ScanKind.ALL;
     private Integer specifiedLevel = null;
 
     public AbstractFileStoreScan(
@@ -87,7 +85,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            CoreOptions.ChangelogProducer changelogProducer,
             boolean readCompacted) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
@@ -101,7 +98,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
-        this.changelogProducer = changelogProducer;
         this.readCompacted = readCompacted;
         this.tableSchemas = new ConcurrentHashMap<>();
     }
@@ -167,8 +163,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withIncremental(boolean isIncremental) {
-        this.isIncremental = isIncremental;
+    public FileStoreScan withKind(ScanKind scanKind) {
+        this.scanKind = scanKind;
         return this;
     }
 
@@ -193,10 +189,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 manifests = Collections.emptyList();
             } else {
                 Snapshot snapshot = snapshotManager.snapshot(snapshotId);
-                manifests =
-                        isIncremental
-                                ? readIncremental(snapshot)
-                                : snapshot.readAllDataManifests(manifestList);
+                manifests = readManiests(snapshot);
             }
         }
 
@@ -263,9 +256,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         };
     }
 
-    private List<ManifestFileMeta> readIncremental(Snapshot snapshot) {
-        switch (changelogProducer) {
-            case INPUT:
+    private List<ManifestFileMeta> readManiests(Snapshot snapshot) {
+        switch (scanKind) {
+            case ALL:
+                return snapshot.readAllDataManifests(manifestList);
+            case DELTA:
+                return manifestList.read(snapshot.deltaManifestList());
+            case CHANGELOG:
                 if (snapshot.version() >= 2) {
                     if (snapshot.changelogManifestList() == null) {
                         return Collections.emptyList();
@@ -283,23 +280,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         String.format(
                                 "Incremental scan does not accept %s snapshot",
                                 snapshot.commitKind()));
-            case FULL_COMPACTION:
-                if (snapshot.changelogManifestList() == null) {
-                    return Collections.emptyList();
-                } else {
-                    return manifestList.read(snapshot.changelogManifestList());
-                }
-            case NONE:
-                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
-                    throw new IllegalStateException(
-                            String.format(
-                                    "Incremental scan does not accept %s 
snapshot",
-                                    snapshot.commitKind()));
-                }
-                return manifestList.read(snapshot.deltaManifestList());
             default:
-                throw new UnsupportedOperationException(
-                        "Unknown changelog producer " + 
changelogProducer.name());
+                throw new UnsupportedOperationException("Unknown scan kind " + 
scanKind.name());
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index b1930f3d..86bcca3b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -68,7 +67,6 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
-                CoreOptions.ChangelogProducer.NONE,
                 readCompacted);
         this.schemaRowStatsConverters = new ConcurrentHashMap<>();
         this.rowType = rowType;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 0ce733b8..b9fe5f80 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -46,7 +46,7 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
 
-    FileStoreScan withIncremental(boolean isIncremental);
+    FileStoreScan withKind(ScanKind scanKind);
 
     FileStoreScan withLevel(int level);
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 3cb707b2..68e44f0a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -62,7 +61,6 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            CoreOptions.ChangelogProducer changelogProducer,
             boolean readCompacted) {
         super(
                 partitionType,
@@ -74,7 +72,6 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
-                changelogProducer,
                 readCompacted);
         this.keyValueFieldsExtractor = keyValueFieldsExtractor;
         this.schemaKeyStatsConverters = new ConcurrentHashMap<>();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
new file mode 100644
index 00000000..5c7fb78e
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+/** Scan which part of the snapshot. */
+public enum ScanKind {
+    /** Scan complete data files of a snapshot. */
+    ALL,
+    /** Only scan newly changed files of a snapshot. */
+    DELTA,
+    /** Only scan changelog files of a snapshot. */
+    CHANGELOG
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 2b8e288b..73e23986 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -47,7 +48,7 @@ public abstract class DataTableScan implements TableScan {
     private final FileStorePathFactory pathFactory;
     private final CoreOptions options;
 
-    private boolean isIncremental = false;
+    private ScanKind scanKind = ScanKind.ALL;
 
     protected DataTableScan(
             FileStoreScan scan,
@@ -93,9 +94,9 @@ public abstract class DataTableScan implements TableScan {
         return this;
     }
 
-    public DataTableScan withIncremental(boolean isIncremental) {
-        this.isIncremental = isIncremental;
-        scan.withIncremental(isIncremental);
+    public DataTableScan withKind(ScanKind scanKind) {
+        this.scanKind = scanKind;
+        scan.withKind(scanKind);
         return this;
     }
 
@@ -119,7 +120,8 @@ public abstract class DataTableScan implements TableScan {
 
     private List<DataSplit> generateSplits(
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
-        return generateSplits(isIncremental, splitGenerator(pathFactory), 
groupedDataFiles);
+        return generateSplits(
+                scanKind != ScanKind.ALL, splitGenerator(pathFactory), 
groupedDataFiles);
     }
 
     @VisibleForTesting
@@ -171,6 +173,7 @@ public abstract class DataTableScan implements TableScan {
             this.splits = splits;
         }
 
+        @SuppressWarnings({"unchecked", "rawtypes"})
         @Override
         public List<Split> splits() {
             return (List) splits;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
deleted file mode 100644
index 067ef812..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.concurrent.Callable;
-
-import static 
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-
-/** Enumerator to enumerate incremental snapshots. */
-public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.EnumeratorResult> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotEnumerator.class);
-
-    private final SnapshotManager snapshotManager;
-    private final DataTableScan scan;
-    private final CoreOptions.ChangelogProducer changelogProducer;
-
-    private long nextSnapshotId;
-
-    public SnapshotEnumerator(
-            Path tablePath,
-            DataTableScan scan,
-            CoreOptions.ChangelogProducer changelogProducer,
-            long currentSnapshot) {
-        this.snapshotManager = new SnapshotManager(tablePath);
-        this.scan = scan;
-        this.changelogProducer = changelogProducer;
-
-        this.nextSnapshotId = currentSnapshot + 1;
-    }
-
-    @Nullable
-    @Override
-    public EnumeratorResult call() {
-        // TODO sync with processDiscoveredSplits to avoid too more splits in 
memory
-        while (true) {
-            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
-                // TODO check latest snapshot id, expired?
-                LOG.debug(
-                        "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
-                        nextSnapshotId);
-                return null;
-            }
-
-            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-
-            if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
-                LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
-                nextSnapshotId++;
-                continue;
-            }
-
-            if (changelogProducer == CoreOptions.ChangelogProducer.NONE
-                    && snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
-                LOG.debug(
-                        "ChangelogProducer is NONE. "
-                                + "Next snapshot id {} is not APPEND, but is 
{}, check next one.",
-                        nextSnapshotId,
-                        snapshot.commitKind());
-                nextSnapshotId++;
-                continue;
-            }
-
-            DataFilePlan plan = scan.withSnapshot(nextSnapshotId).plan();
-            EnumeratorResult result = new EnumeratorResult(nextSnapshotId, 
plan);
-            LOG.debug("Find snapshot id {}.", nextSnapshotId);
-
-            nextSnapshotId++;
-            return result;
-        }
-    }
-
-    /** Enumerator result. */
-    public static class EnumeratorResult {
-
-        public final long snapshotId;
-
-        public final DataFilePlan plan;
-
-        private EnumeratorResult(long snapshotId, DataFilePlan plan) {
-            this.snapshotId = snapshotId;
-            this.plan = plan;
-        }
-    }
-
-    /** Startup snapshot enumerator, this is the first plan for continuous 
reading. */
-    public static DataFilePlan startup(DataTableScan scan) {
-        CoreOptions options = scan.options();
-        SnapshotManager snapshotManager = scan.snapshotManager();
-        CoreOptions.LogStartupMode startupMode = options.logStartupMode();
-        switch (startupMode) {
-            case FULL:
-                DataFilePlan plan;
-                if (options.changelogProducer() == FULL_COMPACTION) {
-                    // Read the results of the last full compaction.
-                    // Only full compaction results will appear on the max 
level.
-                    plan = scan.withLevel(options.numLevels() - 1).plan();
-                } else {
-                    plan = scan.plan();
-                }
-                return plan;
-            case LATEST:
-                return new DataFilePlan(
-                        snapshotManager.latestSnapshotId(), 
Collections.emptyList());
-            case FROM_TIMESTAMP:
-                Long timestampMills = options.logScanTimestampMills();
-                if (timestampMills == null) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "%s can not be null when you use %s for 
%s",
-                                    CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
-                                    CoreOptions.LogStartupMode.FROM_TIMESTAMP,
-                                    CoreOptions.LOG_SCAN.key()));
-                }
-                return new DataFilePlan(
-                        snapshotManager.earlierThanTimeMills(timestampMills),
-                        Collections.emptyList());
-            default:
-                throw new UnsupportedOperationException("Unsupported startup 
mode: " + startupMode);
-        }
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index a00bed89..be2d05e5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -19,11 +19,14 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateFilter;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.FileStoreTable;
+import 
org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.utils.TypeUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
@@ -47,14 +50,14 @@ public class TableStreamingReader {
     private final int[] projection;
     @Nullable private final Predicate predicate;
     @Nullable private final PredicateFilter recordFilter;
-
-    private SnapshotEnumerator enumerator;
+    private final SnapshotEnumerator enumerator;
 
     public TableStreamingReader(
             FileStoreTable table, int[] projection, @Nullable Predicate 
predicate) {
         this.table = table;
         this.projection = projection;
         this.predicate = predicate;
+
         if (predicate != null) {
             List<String> fieldNames = table.schema().fieldNames();
             List<String> primaryKeys = table.schema().primaryKeys();
@@ -79,34 +82,20 @@ public class TableStreamingReader {
         } else {
             recordFilter = null;
         }
+
+        DataTableScan scan = table.newScan();
+        if (predicate != null) {
+            scan.withFilter(predicate);
+        }
+        enumerator =
+                new DeltaSnapshotEnumerator(
+                        table.location(), scan, 
CoreOptions.LogStartupMode.FULL, null, null);
     }
 
     @Nullable
-    public Iterator<RowData> nextBatch() throws IOException {
-        if (enumerator == null) {
-            DataTableScan scan = table.newScan();
-            if (predicate != null) {
-                scan.withFilter(predicate);
-            }
-            DataTableScan.DataFilePlan plan = scan.plan();
-            if (plan.snapshotId == null) {
-                return null;
-            }
-            long snapshotId = plan.snapshotId;
-            enumerator =
-                    new SnapshotEnumerator(
-                            table.location(),
-                            scan.withIncremental(true),
-                            table.options().changelogProducer(),
-                            snapshotId);
-            return read(plan);
-        } else {
-            SnapshotEnumerator.EnumeratorResult result = enumerator.call();
-            if (result == null) {
-                return null;
-            }
-            return read(result.plan);
-        }
+    public Iterator<RowData> nextBatch() throws Exception {
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        return plan == null ? null : read(plan);
     }
 
     private Iterator<RowData> read(DataTableScan.DataFilePlan plan) throws 
IOException {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..c345eac5
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record 
related data files. */
+public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final CoreOptions.LogStartupMode startupMode;
+    private @Nullable final Long startupMillis;
+
+    private @Nullable Long nextSnapshotId;
+
+    public DataFileSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startupMode = startupMode;
+        this.startupMillis = startupMillis;
+
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan enumerate() {
+        if (nextSnapshotId == null) {
+            return tryFirstEnumerate();
+        } else {
+            return nextEnumerate();
+        }
+    }
+
+    private DataTableScan.DataFilePlan tryFirstEnumerate() {
+        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
+            return null;
+        }
+
+        DataTableScan.DataFilePlan plan;
+        switch (startupMode) {
+            case FULL:
+                plan = 
scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+                break;
+            case FROM_TIMESTAMP:
+                Preconditions.checkNotNull(
+                        startupMillis,
+                        String.format(
+                                "%s can not be null when you use %s for %s",
+                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                CoreOptions.LOG_SCAN.key()));
+                startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            case LATEST:
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown log startup mode " + startupMode.name());
+        }
+
+        nextSnapshotId = startingSnapshotId + 1;
+        return plan;
+    }
+
+    private DataTableScan.DataFilePlan nextEnumerate() {
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                LOG.debug(
+                        "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+
+            if (shouldReadSnapshot(snapshot)) {
+                LOG.debug("Find snapshot id {}.", nextSnapshotId);
+                DataTableScan.DataFilePlan plan = 
getPlan(scan.withSnapshot(nextSnapshotId));
+                nextSnapshotId++;
+                return plan;
+            } else {
+                nextSnapshotId++;
+            }
+        }
+    }
+
+    protected abstract boolean shouldReadSnapshot(Snapshot snapshot);
+
+    protected abstract DataTableScan.DataFilePlan getPlan(DataTableScan scan);
+
+    public static DataFileSnapshotEnumerator create(
+            FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+        Path location = table.location();
+        CoreOptions.LogStartupMode startupMode = 
table.options().logStartupMode();
+        Long startupMillis = table.options().logScanTimestampMills();
+
+        switch (table.options().changelogProducer()) {
+            case NONE:
+                return new DeltaSnapshotEnumerator(
+                        location, scan, startupMode, startupMillis, 
nextSnapshotId);
+            case INPUT:
+                return new InputChangelogSnapshotEnumerator(
+                        location, scan, startupMode, startupMillis, 
nextSnapshotId);
+            case FULL_COMPACTION:
+                return new FullCompactionChangelogSnapshotEnumerator(
+                        location,
+                        scan,
+                        table.options().numLevels() - 1,
+                        startupMode,
+                        startupMillis,
+                        nextSnapshotId);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown changelog producer " + 
table.options().changelogProducer().name());
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
new file mode 100644
index 00000000..857ef88d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in 
{@link
+ * Snapshot#deltaManifestList()} for each newly created snapshots.
+ */
+public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeltaSnapshotEnumerator.class);
+
+    public DeltaSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
+    }
+
+    @Override
+    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+            return true;
+        }
+
+        LOG.debug(
+                "Next snapshot id {} is not APPEND, but is {}, check next 
one.",
+                snapshot.id(),
+                snapshot.commitKind());
+        return false;
+    }
+
+    @Override
+    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+        return scan.withKind(ScanKind.DELTA).plan();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
new file mode 100644
index 00000000..85c90f72
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in 
{@link
+ * Snapshot#changelogManifestList()} for each newly created snapshots.
+ *
+ * <p>This enumerator looks for changelog files produced from full compaction. 
Can only be used when
+ * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link
+ * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ */
+public class FullCompactionChangelogSnapshotEnumerator extends 
DataFileSnapshotEnumerator {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FullCompactionChangelogSnapshotEnumerator.class);
+
+    public FullCompactionChangelogSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            int maxLevel,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis, 
nextSnapshotId);
+    }
+
+    @Override
+    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+            return true;
+        }
+
+        LOG.debug(
+                "Next snapshot id {} is not COMPACT, but is {}, check next 
one.",
+                snapshot.id(),
+                snapshot.commitKind());
+        return false;
+    }
+
+    @Override
+    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+        return scan.withKind(ScanKind.CHANGELOG).plan();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
new file mode 100644
index 00000000..03f221bf
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in 
{@link
+ * Snapshot#changelogManifestList()} for each newly created snapshots.
+ *
+ * <p>This enumerator looks for changelog files produced directly from input. 
Can only be used when
+ * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link 
CoreOptions.ChangelogProducer#INPUT}.
+ */
+public class InputChangelogSnapshotEnumerator extends 
DataFileSnapshotEnumerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(InputChangelogSnapshotEnumerator.class);
+
+    public InputChangelogSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            Long nextSnapshotId) {
+        super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
+    }
+
+    @Override
+    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+            return true;
+        }
+
+        LOG.debug(
+                "Next snapshot id {} is not APPEND, but is {}, check next 
one.",
+                snapshot.id(),
+                snapshot.commitKind());
+        return false;
+    }
+
+    @Override
+    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+        return scan.withKind(ScanKind.CHANGELOG).plan();
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
similarity index 50%
copy from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
index 10c2b23e..93c1cb08 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
@@ -16,34 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.connector.source;
-
-import java.util.Collection;
-
-/**
- * A checkpoint of the current state of the containing the currently pending 
splits that are not yet
- * assigned.
- */
-public class PendingSplitsCheckpoint {
-
-    public static final long INVALID_SNAPSHOT = -1L;
-
-    /** The splits in the checkpoint. */
-    private final Collection<FileStoreSourceSplit> splits;
-
-    private final long currentSnapshotId;
-
-    public PendingSplitsCheckpoint(
-            Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
-        this.splits = splits;
-        this.currentSnapshotId = currentSnapshotId;
-    }
-
-    public Collection<FileStoreSourceSplit> splits() {
-        return splits;
-    }
-
-    public long currentSnapshotId() {
-        return currentSnapshotId;
-    }
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import javax.annotation.Nullable;
+
+/** Enumerate incremental changes from newly created snapshots. */
+public interface SnapshotEnumerator {
+
+    /**
+     * The first call to this method will produce a {@link 
DataTableScan.DataFilePlan} containing
+     * the base files for the following incremental changes (or just return 
null if there are no
+     * base files).
+     *
+     * <p>Following calls to this method will produce {@link 
DataTableScan.DataFilePlan}s containing
+     * incremental changed files. If there is currently no newer snapshots, 
null will be returned
+     * instead.
+     */
+    @Nullable
+    DataTableScan.DataFilePlan enumerate();
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index be0c1a9d..d8d46e03 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -270,7 +271,15 @@ public class TestFileStore extends KeyValueFileStore {
                 snapshotId <= endInclusive;
                 snapshotId++) {
             List<ManifestEntry> entries =
-                    
newScan().withIncremental(true).withSnapshot(snapshotId).plan().files();
+                    newScan()
+                            .withKind(
+                                    options.changelogProducer()
+                                                    == 
CoreOptions.ChangelogProducer.NONE
+                                            ? ScanKind.DELTA
+                                            : ScanKind.CHANGELOG)
+                            .withSnapshot(snapshotId)
+                            .plan()
+                            .files();
             result.addAll(readKvsFromManifestEntries(entries, true));
         }
         return result;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 7e4ea531..c8332af7 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -115,7 +116,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -133,7 +134,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -149,7 +150,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         Predicate predicate = builder.equal(2, 101L);
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+                
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 24067da5..be4a59aa 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -103,7 +104,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -123,7 +124,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -139,7 +140,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
 
         Predicate predicate = builder.equal(2, 201L);
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+                
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -190,7 +191,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         write.close();
 
         // check that no data file is produced
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         assertThat(splits).isEmpty();
         // check that no changelog file is produced
         Path bucketPath = DataFilePathFactory.bucketPath(table.location(), 
"1", 0);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
index 8c8dbfae..066e1516 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -140,7 +141,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_0_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
                     // filter with "b" = 15 in schema0
                     TableRead read = 
table.newRead().withFilter(builder.equal(2, 15));
 
@@ -157,7 +158,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_1_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
 
                     // filter with "d" = 15 in schema1 which should be mapped 
to "b" = 15 in schema0
                     /// TODO: changelog with key only supports to filter on key
@@ -191,7 +192,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_0_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
                     // filter with "kt" = 116 in schema0
                     TableRead read = 
table.newRead().withFilter(builder.equal(4, 116));
 
@@ -205,7 +206,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_1_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
 
                     // filter with "kt" = 120 in schema1
                     TableRead read = 
table.newRead().withFilter(builder.equal(1, 120));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 7dd1f502..00feaa6a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
-import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -35,9 +35,11 @@ import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import 
org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.utils.CompatibilityTestUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -166,7 +168,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -185,7 +187,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
 
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -202,7 +204,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+                
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -233,7 +235,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.commit(0, write.prepareCommit(true, 0));
         write.close();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
@@ -267,7 +269,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.compact(binaryRow(2), 0, true);
         commit.commit(0, write.prepareCommit(true, 0));
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+        List<Split> splits = 
table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
@@ -288,7 +290,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.compact(binaryRow(2), 0, true);
         commit.commit(2, write.prepareCommit(true, 2));
 
-        splits = table.newScan().withIncremental(true).plan().splits();
+        splits = table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder("+I 
1|30|130|binary|varbinary|mapKey:mapVal|multiset");
         assertThat(getResult(read, splits, binaryRow(2), 0, 
CHANGELOG_ROW_TO_STRING))
@@ -312,7 +314,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.compact(binaryRow(2), 0, true);
         commit.commit(4, write.prepareCommit(true, 4));
 
-        splits = table.newScan().withIncremental(true).plan().splits();
+        splits = table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
                         "-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
@@ -371,18 +373,15 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 Collections.singletonList("-D 
2|10|301|binary|varbinary")));
 
         SnapshotEnumerator enumerator =
-                new SnapshotEnumerator(
-                        tablePath,
-                        (DataTableScan) table.newScan().withIncremental(true),
-                        ChangelogProducer.INPUT,
-                        Snapshot.FIRST_SNAPSHOT_ID - 1);
+                new InputChangelogSnapshotEnumerator(
+                        tablePath, table.newScan(), 
CoreOptions.LogStartupMode.FULL, null, 1L);
 
         FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
                 i -> {
-                    SnapshotEnumerator.EnumeratorResult result = 
enumerator.call();
-                    assertThat(result).isNotNull();
+                    TableScan.Plan plan = enumerator.enumerate();
+                    assertThat(plan).isNotNull();
 
-                    List<Split> splits = result.plan.splits();
+                    List<Split> splits = plan.splits();
                     TableRead read = table.newRead();
                     for (int j = 0; j < 2; j++) {
                         assertThat(
@@ -403,7 +402,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         }
 
         // no more changelog
-        assertThat(enumerator.call()).isNull();
+        assertThat(enumerator.enumerate()).isNull();
 
         // write another commit
         TableWrite write = table.newWrite(commitUser);
@@ -418,7 +417,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         assertNextSnapshot.apply(2);
 
         // no more changelog
-        assertThat(enumerator.call()).isNull();
+        assertThat(enumerator.enumerate()).isNull();
     }
 
     private void writeData() throws Exception {
@@ -591,7 +590,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 20, 200L));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        DataTableScan scan = table.newScan().withIncremental(true);
+        DataTableScan scan = table.newScan().withKind(ScanKind.DELTA);
         List<DataSplit> splits0 = scan.plan().splits;
         assertThat(splits0).hasSize(1);
         assertThat(splits0.get(0).files()).hasSize(1);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
index 21321635..47dadf70 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Equal;
 import org.apache.flink.table.store.file.predicate.IsNull;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
@@ -340,7 +341,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
         writeAndCheckFileResult(
                 schemas -> {
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
                     TableRead read = table.newRead();
 
                     assertThat(getResult(read, splits, 
STREAMING_SCHEMA_0_ROW_TO_STRING))
@@ -353,7 +354,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                 },
                 (files, schemas) -> {
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
 
                     TableRead read = table.newRead();
                     assertThat(getResult(read, splits, 
STREAMING_SCHEMA_1_ROW_TO_STRING))
@@ -373,7 +374,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
         writeAndCheckFileResult(
                 schemas -> {
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
                     // project "c", "b", "pt" in schema0
                     TableRead read = 
table.newRead().withProjection(PROJECTION);
 
@@ -384,7 +385,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                 },
                 (files, schemas) -> {
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
 
                     // project "a", "kt", "d" in schema1
                     TableRead read = 
table.newRead().withProjection(PROJECTION);
@@ -404,7 +405,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_0_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
                     // filter with "b" = 15 in schema0
                     TableRead read = 
table.newRead().withFilter(builder.equal(2, 15));
 
@@ -418,7 +419,7 @@ public abstract class FileDataFilterTestBase extends 
SchemaEvolutionTableTestBas
                     PredicateBuilder builder =
                             new PredicateBuilder(RowDataType.toRowType(false, 
SCHEMA_1_FIELDS));
                     FileStoreTable table = createFileStoreTable(schemas);
-                    List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
+                    List<Split> splits = 
table.newScan().withKind(ScanKind.DELTA).plan().splits();
 
                     // filter with "d" = 15 in schema1 which should be mapped 
to "b" = 15 in schema0
                     TableRead read1 = 
table.newRead().withFilter(builder.equal(1, 15));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
new file mode 100644
index 00000000..916df0cc
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Base test class for {@link DataFileSnapshotEnumerator}.
+ *
+ * <p>TODO: merge this class with FileStoreTableTestBase.
+ */
+public abstract class DataFileSnapshotEnumeratorTestBase {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.BIGINT().getLogicalType()
+                    },
+                    new String[] {"pt", "a", "b"});
+
+    @TempDir java.nio.file.Path tempDir;
+
+    protected Path tablePath;
+    protected String commitUser;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    protected GenericRowData rowData(Object... values) {
+        return GenericRowData.of(values);
+    }
+
+    protected GenericRowData rowDataWithKind(RowKind rowKind, Object... 
values) {
+        return GenericRowData.ofKind(rowKind, values);
+    }
+
+    protected BinaryRowData binaryRow(int a) {
+        BinaryRowData b = new BinaryRowData(1);
+        BinaryRowWriter writer = new BinaryRowWriter(b);
+        writer.writeInt(0, a);
+        writer.complete();
+        return b;
+    }
+
+    protected List<String> getResult(TableRead read, List<Split> splits) 
throws Exception {
+        List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new 
ArrayList<>();
+        for (Split split : splits) {
+            readers.add(() -> read.createReader(split));
+        }
+        RecordReader<RowData> recordReader = 
ConcatRecordReader.create(readers);
+        RecordReaderIterator<RowData> iterator = new 
RecordReaderIterator<>(recordReader);
+        List<String> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            RowData rowData = iterator.next();
+            result.add(
+                    String.format(
+                            "%s %d|%d|%d",
+                            rowData.getRowKind().shortString(),
+                            rowData.getInt(0),
+                            rowData.getInt(1),
+                            rowData.getLong(2)));
+        }
+        iterator.close();
+        return result;
+    }
+
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Configuration conf = getConf();
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema tableSchema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "a"),
+                                conf.toMap(),
+                                ""));
+        return FileStoreTableFactory.create(tablePath, tableSchema, conf);
+    }
+
+    protected abstract Configuration getConf();
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..65fe9f53
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DeltaSnapshotEnumerator}. */
+public class DeltaSnapshotEnumeratorTest extends 
DataFileSnapshotEnumeratorTestBase {
+
+    @Test
+    public void testFullStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+        SnapshotEnumerator enumerator =
+                new DeltaSnapshotEnumerator(
+                        tablePath, table.newScan(), 
CoreOptions.LogStartupMode.FULL, null, null);
+
+        // first call without any snapshot, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write base data
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // first call with snapshot, should return complete records from 2nd 
commit
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
+
+        // incremental call without new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write incremental data
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        // first incremental call, should return incremental records from 3rd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", 
"+I 1|40|400"));
+
+        // second incremental call, should return incremental records from 4th 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+
+        // no more new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testFromTimestampStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // log current time millis, we'll start from here
+        Thread.sleep(50);
+        long startMillis = System.currentTimeMillis();
+        Thread.sleep(50);
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // first call with snapshot, should return empty plan
+        SnapshotEnumerator enumerator =
+                new DeltaSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        startMillis,
+                        null);
+
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(plan.splits()).isEmpty();
+
+        // first incremental call, should return incremental records from 2nd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", 
"-D 1|40|400"));
+
+        // second incremental call, should return incremental records from 3rd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", 
"+I 1|40|400"));
+
+        // no new snapshots
+        assertThat(enumerator.enumerate()).isNull();
+
+        // more incremental records
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testLatestStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        SnapshotEnumerator enumerator =
+                new DeltaSnapshotEnumerator(
+                        tablePath, table.newScan(), 
CoreOptions.LogStartupMode.LATEST, null, null);
+
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(plan.splits()).isEmpty();
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", 
"+I 1|40|400"));
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Override
+    protected Configuration getConf() {
+        return new Configuration();
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..72531f4f
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FullCompactionChangelogSnapshotEnumerator}. */
+public class FullCompactionChangelogSnapshotEnumeratorTest
+        extends DataFileSnapshotEnumeratorTestBase {
+
+    @Test
+    public void testFullStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+        SnapshotEnumerator enumerator =
+                new FullCompactionChangelogSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        table.options().numLevels() - 1,
+                        CoreOptions.LogStartupMode.FULL,
+                        null,
+                        null);
+
+        // first call without any snapshot, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write base data
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // some more records
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // first call with snapshot, should return full compacted records from 
3rd commit
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
+
+        // incremental call without new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write incremental data
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        // no new compact snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+
+        // full compaction done, read new changelog
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(6);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "-U 1|10|101",
+                                "+U 1|10|103",
+                                "-U 1|20|200",
+                                "+U 1|20|201",
+                                "+I 1|50|500"));
+
+        // no more new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testFromTimestampStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // log current time millis, we'll start from here
+        Thread.sleep(50);
+        long startMillis = System.currentTimeMillis();
+        Thread.sleep(50);
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        SnapshotEnumerator enumerator =
+                new FullCompactionChangelogSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        table.options().numLevels() - 1,
+                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        startMillis,
+                        null);
+
+        // first call, should return empty plan
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(plan.splits()).isEmpty();
+
+        // first incremental call, no new compact snapshot, should be null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(6);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "-U 1|10|101",
+                                "+U 1|10|103",
+                                "-U 1|20|200",
+                                "+U 1|20|201",
+                                "+I 1|50|500"));
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testLatestStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        SnapshotEnumerator enumerator =
+                new FullCompactionChangelogSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        table.options().numLevels() - 1,
+                        CoreOptions.LogStartupMode.LATEST,
+                        null,
+                        null);
+
+        // first call, should be empty plan
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(plan.splits()).isEmpty();
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // first incremental call, no new compact snapshot, should be null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(6);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "-U 1|10|101",
+                                "+U 1|10|103",
+                                "-U 1|20|200",
+                                "+U 1|20|201",
+                                "+I 1|50|500"));
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Override
+    protected Configuration getConf() {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.FULL_COMPACTION);
+        return conf;
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..5760110c
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link InputChangelogSnapshotEnumerator}. */
+public class InputChangelogSnapshotEnumeratorTest extends 
DataFileSnapshotEnumeratorTestBase {
+
+    @Test
+    public void testFullStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+        SnapshotEnumerator enumerator =
+                new InputChangelogSnapshotEnumerator(
+                        tablePath, table.newScan(), 
CoreOptions.LogStartupMode.FULL, null, null);
+
+        // first call without any snapshot, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write base data
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // first call with snapshot, should return complete records from 2nd 
commit
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
+
+        // incremental call without new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        // write incremental data
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        // first incremental call, should return incremental records from 3rd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 
1|20|201", "+I 1|40|400"));
+
+        // second incremental call, should return incremental records from 4th 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+
+        // no more new snapshots, should return null
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testFromTimestampStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // log current time millis, we'll start from here
+        Thread.sleep(50);
+        long startMillis = System.currentTimeMillis();
+        Thread.sleep(50);
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // first call with snapshot, should return empty plan
+        SnapshotEnumerator enumerator =
+                new InputChangelogSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        startMillis,
+                        null);
+
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(plan.splits()).isEmpty();
+
+        // first incremental call, should return incremental records from 2nd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", 
"-D 1|40|400"));
+
+        // second incremental call, should return incremental records from 3rd 
commit
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 
1|20|201", "+I 1|40|400"));
+
+        // no new snapshots
+        assertThat(enumerator.enumerate()).isNull();
+
+        // more incremental records
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testLatestStartupMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableRead read = table.newRead();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        SnapshotEnumerator enumerator =
+                new InputChangelogSnapshotEnumerator(
+                        tablePath, table.newScan(), 
CoreOptions.LogStartupMode.LATEST, null, null);
+
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(plan.splits()).isEmpty();
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 
1|20|201", "+I 1|40|400"));
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.write(rowData(1, 50, 500L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", 
"+I 1|50|500"));
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+
+    @Override
+    protected Configuration getConf() {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+        return conf;
+    }
+}


Reply via email to