This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e580bda8 [core] Clean TableScan related codes (#805)
7e580bda8 is described below

commit 7e580bda8c786b1037d983b8d523e73f6e6e6dc7
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 4 13:24:06 2023 +0800

    [core] Clean TableScan related codes (#805)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   9 ++
 .../paimon/table/AbstractFileStoreTable.java       |  33 ++++---
 .../java/org/apache/paimon/table/DataTable.java    |   8 --
 ...aTableScan.java => AbstractInnerTableScan.java} |  13 ++-
 .../{BatchDataTableScan.java => DataFilePlan.java} |  30 +++---
 .../apache/paimon/table/source/DataTableScan.java  |  65 -------------
 ...ScanImpl.java => InnerStreamTableScanImpl.java} |  83 ++++------------
 .../apache/paimon/table/source/InnerTableScan.java |  10 --
 ...aTableScanImpl.java => InnerTableScanImpl.java} |  34 +------
 .../paimon/table/source/StreamDataTableScan.java   |  52 -----------
 .../CompactionChangelogFollowUpScanner.java        |   8 +-
 .../ContinuousCompactorFollowUpScanner.java        |   8 +-
 .../source/snapshot/DeltaFollowUpScanner.java      |   8 +-
 .../table/source/snapshot/FollowUpScanner.java     |  14 +--
 .../snapshot/InputChangelogFollowUpScanner.java    |   8 +-
 .../table/source/snapshot/SnapshotSplitReader.java |  46 ---------
 .../source/snapshot/SnapshotSplitReaderImpl.java   |  44 ++++++++-
 .../table/source/snapshot/StartingScanner.java     |   8 +-
 .../StaticFromSnapshotStartingScanner.java         |   3 +-
 .../apache/paimon/table/system/AuditLogTable.java  |  98 +++----------------
 .../apache/paimon/table/system/BucketsTable.java   |  11 +--
 .../org/apache/paimon/table/system/FilesTable.java |  24 ++---
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   9 +-
 .../paimon/table/source/StartupModeTest.java       | 104 ++++++++++-----------
 ...TableScanTest.java => StreamTableScanTest.java} |  20 ++--
 ...chDataTableScanTest.java => TableScanTest.java} |   8 +-
 .../CompactionChangelogFollowUpScannerTest.java    |   4 +-
 .../ContinuousCompactorFollowUpScannerTest.java    |   4 +-
 .../source/snapshot/DeltaFollowUpScannerTest.java  |   4 +-
 .../InputChangelogFollowUpScannerTest.java         |   4 +-
 .../flink/lookup/FileStoreLookupFunction.java      |   1 -
 .../paimon/flink/lookup}/TableStreamingReader.java |  30 ++++--
 .../flink/source/CompactorSourceBuilder.java       |  36 +++++--
 .../flink/source/ContinuousFileStoreSource.java    |  16 +---
 .../paimon/flink/source/StaticFileStoreSource.java |  14 +--
 .../apache/paimon/flink/utils/TableScanUtils.java  |  56 +----------
 .../paimon/flink/action/CompactActionITCase.java   |  17 ++--
 .../flink/action/DropPartitionActionITCase.java    |   6 +-
 .../flink/sink/CommitterOperatorTestBase.java      |   2 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     |  14 +--
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  |   4 +-
 .../paimon/flink/source/CompactorSourceITCase.java |  27 +++++-
 .../source/ContinuousFileSplitEnumeratorTest.java  |  74 +++------------
 .../source/FileStoreSourceSplitGeneratorTest.java  |   8 +-
 .../paimon/hive/mapred/PaimonInputFormat.java      |   9 +-
 45 files changed, 376 insertions(+), 712 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 865ba1b3a..ae188173b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -559,6 +559,15 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Full compaction will be constantly triggered 
after delta commits.");
 
+    @ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<Boolean> STREAMING_COMPACT =
+            key("streaming-compact")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Only used to force TableScan to construct 
'ContinuousCompactorStartingScanner' and "
+                                    + "'ContinuousCompactorFollowUpScanner' 
for dedicated streaming compaction job.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aea57e2c8..7881d113a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -31,16 +31,17 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.BatchDataTableScanImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.InnerTableScanImpl;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScanImpl;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -78,14 +79,13 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public BatchDataTableScan newScan() {
-        return new BatchDataTableScanImpl(
-                coreOptions(), newSnapshotSplitReader(), snapshotManager());
+    public InnerTableScan newScan() {
+        return new InnerTableScanImpl(coreOptions(), newSnapshotSplitReader(), 
snapshotManager());
     }
 
     @Override
-    public StreamDataTableScan newStreamScan() {
-        return new StreamDataTableScanImpl(
+    public InnerStreamTableScan newStreamScan() {
+        return new InnerStreamTableScanImpl(
                 coreOptions(),
                 newSnapshotSplitReader(),
                 snapshotManager(),
@@ -103,7 +103,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         // check option is not immutable
-        Map<String, String> options = tableSchema.options();
+        Map<String, String> options = new HashMap<>(tableSchema.options());
         dynamicOptions.forEach(
                 (k, v) -> {
                     if (!Objects.equals(v, options.get(k))) {
@@ -111,10 +111,17 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                     }
                 });
 
-        Options newOptions = Options.fromMap(options);
+        // merge non-null dynamic options into schema.options
+        dynamicOptions.forEach(
+                (k, v) -> {
+                    if (v == null) {
+                        options.remove(k);
+                    } else {
+                        options.put(k, v);
+                    }
+                });
 
-        // merge dynamic options into schema.options
-        dynamicOptions.forEach(newOptions::setString);
+        Options newOptions = Options.fromMap(options);
 
         // set path always
         newOptions.set(PATH, path.toString());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index f3b7f92af..0ab7d97cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -31,12 +29,6 @@ public interface DataTable extends InnerTable {
 
     SnapshotSplitReader newSnapshotSplitReader();
 
-    @Override
-    BatchDataTableScan newScan();
-
-    @Override
-    StreamDataTableScan newStreamScan();
-
     CoreOptions coreOptions();
 
     SnapshotManager snapshotManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
similarity index 89%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 345535d7e..14066239f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
+import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
 import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
@@ -37,18 +38,18 @@ import org.apache.paimon.utils.Preconditions;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
-public abstract class AbstractDataTableScan implements DataTableScan {
+public abstract class AbstractInnerTableScan implements InnerTableScan {
 
     private final CoreOptions options;
     protected final SnapshotSplitReader snapshotSplitReader;
 
-    protected AbstractDataTableScan(CoreOptions options, SnapshotSplitReader 
snapshotSplitReader) {
+    protected AbstractInnerTableScan(CoreOptions options, SnapshotSplitReader 
snapshotSplitReader) {
         this.options = options;
         this.snapshotSplitReader = snapshotSplitReader;
     }
 
     @VisibleForTesting
-    public AbstractDataTableScan withBucket(int bucket) {
+    public AbstractInnerTableScan withBucket(int bucket) {
         snapshotSplitReader.withBucket(bucket);
         return this;
     }
@@ -58,6 +59,12 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
     }
 
     protected StartingScanner createStartingScanner(boolean isStreaming) {
+        if (options.toConfiguration().get(CoreOptions.STREAMING_COMPACT)) {
+            Preconditions.checkArgument(
+                    isStreaming, "Set 'streaming-compact' in batch mode. This 
is unexpected.");
+            return new ContinuousCompactorStartingScanner();
+        }
+
         CoreOptions.StartupMode startupMode = options.startupMode();
         switch (startupMode) {
             case LATEST_FULL:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
 b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
similarity index 59%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index a25f2dfae..380325b9d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -18,25 +18,29 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
 
-/** {@link DataTableScan} for batch planning. */
-public interface BatchDataTableScan extends DataTableScan {
+import javax.annotation.Nullable;
 
-    @Override
-    BatchDataTableScan withSnapshot(long snapshotId);
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
-    @Override
-    BatchDataTableScan withFilter(Predicate predicate);
+/** Scanning plan containing snapshot ID and input splits. */
+public class DataFilePlan implements TableScan.Plan {
 
-    @Override
-    BatchDataTableScan withKind(ScanKind scanKind);
+    private final List<DataSplit> splits;
+
+    public DataFilePlan(List<DataSplit> splits) {
+        this.splits = splits;
+    }
 
     @Override
-    BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+    public List<Split> splits() {
+        return new ArrayList<>(splits);
+    }
 
-    BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
+    public static DataFilePlan fromResult(@Nullable StartingScanner.Result 
result) {
+        return new DataFilePlan(result == null ? Collections.emptyList() : 
result.splits());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
deleted file mode 100644
index c5249c5bc..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-
-/** A {@link TableScan} for reading data. */
-public interface DataTableScan extends InnerTableScan {
-
-    DataTableScan withKind(ScanKind kind);
-
-    DataTableScan withSnapshot(long snapshotId);
-
-    DataTableScan withLevelFilter(Filter<Integer> levelFilter);
-
-    DataTableScan withFilter(Predicate predicate);
-
-    DataTableScan.DataFilePlan plan();
-
-    /** Scanning plan containing snapshot ID and input splits. */
-    class DataFilePlan implements Plan {
-
-        public final List<DataSplit> splits;
-
-        @VisibleForTesting
-        public DataFilePlan(List<DataSplit> splits) {
-            this.splits = splits;
-        }
-
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        @Override
-        public List<Split> splits() {
-            return (List) splits;
-        }
-
-        public static DataFilePlan fromResult(@Nullable StartingScanner.Result 
result) {
-            return new DataFilePlan(result == null ? Collections.emptyList() : 
result.splits());
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
similarity index 72%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index afa4a302b..46c01b219 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -20,18 +20,15 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
+import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -41,10 +38,11 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 
-/** {@link DataTableScan} for streaming planning. */
-public class StreamDataTableScanImpl extends AbstractDataTableScan implements 
StreamDataTableScan {
+/** {@link StreamTableScan} implementation for streaming planning. */
+public class InnerStreamTableScanImpl extends AbstractInnerTableScan
+        implements InnerStreamTableScan {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(StreamDataTableScan.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
 
     private final CoreOptions options;
     private final SnapshotManager snapshotManager;
@@ -56,7 +54,7 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
     private boolean isEnd = false;
     @Nullable private Long nextSnapshotId;
 
-    public StreamDataTableScanImpl(
+    public InnerStreamTableScanImpl(
             CoreOptions options,
             SnapshotSplitReader snapshotSplitReader,
             SnapshotManager snapshotManager,
@@ -68,63 +66,13 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
     }
 
     @Override
-    public StreamDataTableScanImpl withSnapshot(long snapshotId) {
-        snapshotSplitReader.withSnapshot(snapshotId);
-        return this;
-    }
-
-    @Override
-    public StreamDataTableScanImpl withFilter(Predicate predicate) {
+    public InnerStreamTableScanImpl withFilter(Predicate predicate) {
         snapshotSplitReader.withFilter(predicate);
         return this;
     }
 
     @Override
-    public StreamDataTableScanImpl withKind(ScanKind scanKind) {
-        snapshotSplitReader.withKind(scanKind);
-        return this;
-    }
-
-    @Override
-    public StreamDataTableScanImpl withLevelFilter(Filter<Integer> 
levelFilter) {
-        snapshotSplitReader.withLevelFilter(levelFilter);
-        return this;
-    }
-
-    @Override
-    public boolean supportStreamingReadOverwrite() {
-        return supportStreamingReadOverwrite;
-    }
-
-    @Override
-    public StreamDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
-        this.startingScanner = startingScanner;
-        return this;
-    }
-
-    @Override
-    public StreamDataTableScan withFollowUpScanner(FollowUpScanner 
followUpScanner) {
-        this.followUpScanner = followUpScanner;
-        return this;
-    }
-
-    @Override
-    public StreamDataTableScan withBoundedChecker(BoundedChecker 
boundedChecker) {
-        this.boundedChecker = boundedChecker;
-        return this;
-    }
-
-    @Override
-    public StreamDataTableScan withSnapshotStarting() {
-        startingScanner =
-                options.startupMode() == CoreOptions.StartupMode.COMPACTED_FULL
-                        ? new CompactedStartingScanner()
-                        : new FullStartingScanner();
-        return this;
-    }
-
-    @Override
-    public DataFilePlan plan() {
+    public Plan plan() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(true);
         }
@@ -142,7 +90,7 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         }
     }
 
-    private DataFilePlan tryFirstPlan() {
+    private Plan tryFirstPlan() {
         StartingScanner.Result result = startingScanner.scan(snapshotManager, 
snapshotSplitReader);
         if (result != null) {
             long snapshotId = result.snapshotId();
@@ -154,7 +102,7 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         return DataFilePlan.fromResult(result);
     }
 
-    private DataFilePlan nextPlan() {
+    private Plan nextPlan() {
         while (true) {
             if (isEnd) {
                 throw new EndOfScanException();
@@ -175,17 +123,16 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
 
             // first check changes of overwrite
             if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE
-                    && supportStreamingReadOverwrite()) {
+                    && supportStreamingReadOverwrite) {
                 LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
-                DataTableScan.DataFilePlan overwritePlan =
+                Plan overwritePlan =
                         followUpScanner.getOverwriteChangesPlan(
                                 nextSnapshotId, snapshotSplitReader);
                 nextSnapshotId++;
                 return overwritePlan;
             } else if (followUpScanner.shouldScanSnapshot(snapshot)) {
                 LOG.debug("Find snapshot id {}.", nextSnapshotId);
-                DataTableScan.DataFilePlan plan =
-                        followUpScanner.scan(nextSnapshotId, 
snapshotSplitReader);
+                Plan plan = followUpScanner.scan(nextSnapshotId, 
snapshotSplitReader);
                 nextSnapshotId++;
                 return plan;
             } else {
@@ -195,6 +142,10 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
     }
 
     private FollowUpScanner createFollowUpScanner() {
+        if (options.toConfiguration().get(CoreOptions.STREAMING_COMPACT)) {
+            return new ContinuousCompactorFollowUpScanner();
+        }
+
         CoreOptions.ChangelogProducer changelogProducer = 
options.changelogProducer();
         FollowUpScanner followUpScanner;
         switch (changelogProducer) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 8c485743e..5d78e6e2a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -19,19 +19,9 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-
-import java.util.List;
 
 /** Inner {@link TableScan} contains filter push down. */
 public interface InnerTableScan extends TableScan {
 
-    default InnerTableScan withFilter(List<Predicate> predicates) {
-        if (predicates == null || predicates.isEmpty()) {
-            return this;
-        }
-        return withFilter(PredicateBuilder.and(predicates));
-    }
-
     InnerTableScan withFilter(Predicate predicate);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
similarity index 67%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index d0dead613..e47c22ed5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,15 +19,13 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
-/** {@link DataTableScan} for batch planning. */
-public class BatchDataTableScanImpl extends AbstractDataTableScan implements 
BatchDataTableScan {
+/** {@link TableScan} implementation for batch planning. */
+public class InnerTableScanImpl extends AbstractInnerTableScan {
 
     private final SnapshotManager snapshotManager;
 
@@ -35,7 +33,7 @@ public class BatchDataTableScanImpl extends 
AbstractDataTableScan implements Bat
 
     private boolean hasNext;
 
-    public BatchDataTableScanImpl(
+    public InnerTableScanImpl(
             CoreOptions options,
             SnapshotSplitReader snapshotSplitReader,
             SnapshotManager snapshotManager) {
@@ -45,35 +43,11 @@ public class BatchDataTableScanImpl extends 
AbstractDataTableScan implements Bat
     }
 
     @Override
-    public BatchDataTableScan withSnapshot(long snapshotId) {
-        snapshotSplitReader.withSnapshot(snapshotId);
-        return this;
-    }
-
-    @Override
-    public BatchDataTableScan withFilter(Predicate predicate) {
+    public InnerTableScan withFilter(Predicate predicate) {
         snapshotSplitReader.withFilter(predicate);
         return this;
     }
 
-    @Override
-    public BatchDataTableScan withKind(ScanKind scanKind) {
-        snapshotSplitReader.withKind(scanKind);
-        return this;
-    }
-
-    @Override
-    public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
-        snapshotSplitReader.withLevelFilter(levelFilter);
-        return this;
-    }
-
-    @Override
-    public BatchDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
-        this.startingScanner = startingScanner;
-        return this;
-    }
-
     @Override
     public DataFilePlan plan() {
         if (startingScanner == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
deleted file mode 100644
index 4d60c4402..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
-
-/** {@link DataTableScan} for streaming planning. */
-public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan {
-
-    @Override
-    StreamDataTableScan withSnapshot(long snapshotId);
-
-    @Override
-    StreamDataTableScan withFilter(Predicate predicate);
-
-    @Override
-    StreamDataTableScan withKind(ScanKind scanKind);
-
-    @Override
-    StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter);
-
-    boolean supportStreamingReadOverwrite();
-
-    StreamDataTableScan withStartingScanner(StartingScanner startingScanner);
-
-    StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
-
-    StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
-
-    StreamDataTableScan withSnapshotStarting();
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index ac957587a..0f737e7e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,9 +50,8 @@ public class CompactionChangelogFollowUpScanner implements 
FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan scan(
-            long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
+    public TableScan.Plan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader) {
+        return new DataFilePlan(
                 
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index aa9e83e26..2ea87b325 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -20,7 +20,8 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class ContinuousCompactorFollowUpScanner implements 
FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan scan(
-            long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
+    public TableScan.Plan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader) {
+        return new DataFilePlan(
                 
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index 97e30d141..0ba63f71a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan scan(
-            long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
+    public TableScan.Plan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader) {
+        return new DataFilePlan(
                 
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index 33886e5a5..543a06ca9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -19,19 +19,19 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
 
-/** Helper class for the follow-up planning of {@link StreamDataTableScan}. */
+/** Helper class for the follow-up planning of {@link StreamTableScan}. */
 public interface FollowUpScanner {
 
     boolean shouldScanSnapshot(Snapshot snapshot);
 
-    DataTableScan.DataFilePlan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader);
+    TableScan.Plan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader);
 
-    default DataTableScan.DataFilePlan getOverwriteChangesPlan(
+    default TableScan.Plan getOverwriteChangesPlan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
-                
snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+        return new 
DataFilePlan(snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index c901ef528..1e0ea0919 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class InputChangelogFollowUpScanner implements 
FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan scan(
-            long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
+    public TableScan.Plan scan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader) {
+        return new DataFilePlan(
                 
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
index 814ff32e3..d813ffcc7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
@@ -19,17 +19,12 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.utils.Filter;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /** Read splits from specified {@link Snapshot} with given configuration. */
 public interface SnapshotSplitReader {
@@ -49,45 +44,4 @@ public interface SnapshotSplitReader {
 
     /** Get splits from an overwrite snapshot. */
     List<DataSplit> overwriteSplits();
-
-    static List<DataSplit> generateSplits(
-            long snapshotId,
-            boolean isIncremental,
-            boolean reverseRowKind,
-            SplitGenerator splitGenerator,
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
-        List<DataSplit> splits = new ArrayList<>();
-        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
-                groupedDataFiles.entrySet()) {
-            BinaryRow partition = entry.getKey();
-            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
-                int bucket = bucketEntry.getKey();
-                if (isIncremental) {
-                    // Don't split when incremental
-                    splits.add(
-                            new DataSplit(
-                                    snapshotId,
-                                    partition,
-                                    bucket,
-                                    bucketEntry.getValue(),
-                                    true,
-                                    reverseRowKind));
-                } else {
-                    splitGenerator.split(bucketEntry.getValue()).stream()
-                            .map(
-                                    files ->
-                                            new DataSplit(
-                                                    snapshotId,
-                                                    partition,
-                                                    bucket,
-                                                    files,
-                                                    false,
-                                                    reverseRowKind))
-                            .forEach(splits::add);
-                }
-            }
-        }
-        return splits;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
index eac6893ca..5b3176e50 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.data.BinaryRow;
@@ -43,7 +44,6 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static 
org.apache.paimon.table.source.snapshot.SnapshotSplitReader.generateSplits;
 
 /** Implementation of {@link SnapshotSplitReader}. */
 public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
@@ -194,4 +194,46 @@ public class SnapshotSplitReaderImpl implements 
SnapshotSplitReader {
         }
         return lazyPartitionComparator;
     }
+
+    @VisibleForTesting
+    public static List<DataSplit> generateSplits(
+            long snapshotId,
+            boolean isIncremental,
+            boolean reverseRowKind,
+            SplitGenerator splitGenerator,
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
+        List<DataSplit> splits = new ArrayList<>();
+        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+                groupedDataFiles.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
+                int bucket = bucketEntry.getKey();
+                if (isIncremental) {
+                    // Don't split when incremental
+                    splits.add(
+                            new DataSplit(
+                                    snapshotId,
+                                    partition,
+                                    bucket,
+                                    bucketEntry.getValue(),
+                                    true,
+                                    reverseRowKind));
+                } else {
+                    splitGenerator.split(bucketEntry.getValue()).stream()
+                            .map(
+                                    files ->
+                                            new DataSplit(
+                                                    snapshotId,
+                                                    partition,
+                                                    bucket,
+                                                    files,
+                                                    false,
+                                                    reverseRowKind))
+                            .forEach(splits::add);
+                }
+            }
+        }
+        return splits;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index e0be16c6e..3e850de3c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -18,9 +18,8 @@
 
 package org.apache.paimon.table.source.snapshot;
 
-import org.apache.paimon.table.source.BatchDataTableScan;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -28,10 +27,7 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 
-/**
- * Helper class for the first planning of {@link BatchDataTableScan} and {@link
- * StreamDataTableScan}.
- */
+/** Helper class for the first planning of {@link TableScan}. */
 public interface StartingScanner {
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 4fee6fafd..5ccf82563 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -26,8 +26,7 @@ import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
- * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} 
startup mode of a batch
- * read.
+ * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
  */
 public class StaticFromSnapshotStartingScanner implements StartingScanner {
     private final long snapshotId;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 78536c951..a12cd0862 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -33,16 +33,12 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -127,12 +123,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public BatchDataTableScan newScan() {
+    public InnerTableScan newScan() {
         return new AuditLogBatchScan(dataTable.newScan());
     }
 
     @Override
-    public StreamDataTableScan newStreamScan() {
+    public InnerStreamTableScan newStreamScan() {
         return new AuditLogStreamScan(dataTable.newStreamScan());
     }
 
@@ -222,111 +218,45 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
     }
 
-    private class AuditLogBatchScan implements BatchDataTableScan {
+    private class AuditLogBatchScan implements InnerTableScan {
 
-        private final BatchDataTableScan batchScan;
+        private final InnerTableScan batchScan;
 
-        private AuditLogBatchScan(BatchDataTableScan batchScan) {
+        private AuditLogBatchScan(InnerTableScan batchScan) {
             this.batchScan = batchScan;
         }
 
         @Override
-        public BatchDataTableScan withFilter(Predicate predicate) {
+        public InnerTableScan withFilter(Predicate predicate) {
             convert(predicate).ifPresent(batchScan::withFilter);
             return this;
         }
 
         @Override
-        public BatchDataTableScan withKind(ScanKind kind) {
-            batchScan.withKind(kind);
-            return this;
-        }
-
-        @Override
-        public BatchDataTableScan withSnapshot(long snapshotId) {
-            batchScan.withSnapshot(snapshotId);
-            return this;
-        }
-
-        @Override
-        public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) 
{
-            batchScan.withLevelFilter(levelFilter);
-            return this;
-        }
-
-        @Override
-        public DataTableScan.DataFilePlan plan() {
+        public Plan plan() {
             return batchScan.plan();
         }
-
-        @Override
-        public BatchDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
-            return batchScan.withStartingScanner(startingScanner);
-        }
     }
 
-    private class AuditLogStreamScan implements StreamDataTableScan {
+    private class AuditLogStreamScan implements InnerStreamTableScan {
 
-        private final StreamDataTableScan streamScan;
+        private final InnerStreamTableScan streamScan;
 
-        private AuditLogStreamScan(StreamDataTableScan streamScan) {
+        private AuditLogStreamScan(InnerStreamTableScan streamScan) {
             this.streamScan = streamScan;
         }
 
         @Override
-        public StreamDataTableScan withFilter(Predicate predicate) {
+        public InnerStreamTableScan withFilter(Predicate predicate) {
             convert(predicate).ifPresent(streamScan::withFilter);
             return this;
         }
 
         @Override
-        public StreamDataTableScan withKind(ScanKind kind) {
-            streamScan.withKind(kind);
-            return this;
-        }
-
-        @Override
-        public StreamDataTableScan withSnapshot(long snapshotId) {
-            streamScan.withSnapshot(snapshotId);
-            return this;
-        }
-
-        @Override
-        public StreamDataTableScan withLevelFilter(Filter<Integer> 
levelFilter) {
-            streamScan.withLevelFilter(levelFilter);
-            return this;
-        }
-
-        @Override
-        public DataTableScan.DataFilePlan plan() {
+        public Plan plan() {
             return streamScan.plan();
         }
 
-        @Override
-        public boolean supportStreamingReadOverwrite() {
-            return streamScan.supportStreamingReadOverwrite();
-        }
-
-        @Override
-        public StreamDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
-            return streamScan.withStartingScanner(startingScanner);
-        }
-
-        @Override
-        public StreamDataTableScan withFollowUpScanner(FollowUpScanner 
followUpScanner) {
-            return streamScan.withFollowUpScanner(followUpScanner);
-        }
-
-        @Override
-        public StreamDataTableScan withBoundedChecker(BoundedChecker 
boundedChecker) {
-            return streamScan.withBoundedChecker(boundedChecker);
-        }
-
-        @Override
-        public StreamDataTableScan withSnapshotStarting() {
-            return streamScan.withSnapshotStarting();
-        }
-
         @Nullable
         @Override
         public Long checkpoint() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 819454e78..d1ed08ab4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -31,12 +31,11 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
@@ -120,12 +119,12 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public BatchDataTableScan newScan() {
+    public InnerTableScan newScan() {
         return wrapped.newScan();
     }
 
     @Override
-    public StreamDataTableScan newStreamScan() {
+    public InnerStreamTableScan newStreamScan() {
         return wrapped.newStreamScan();
     }
 
@@ -140,7 +139,7 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public Table copy(Map<String, String> dynamicOptions) {
+    public BucketsTable copy(Map<String, String> dynamicOptions) {
         return new BucketsTable(wrapped.copy(dynamicOptions), isContinuous);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 3c213b04c..5974925eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -35,11 +35,11 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -166,14 +166,14 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public long rowCount() {
-            DataTableScan.DataFilePlan plan = dataFilePlan();
-            if (plan == null) {
-                return 0;
-            }
-            return plan.splits.stream().mapToLong(s -> s.files().size()).sum();
+            TableScan.Plan plan = plan();
+            return plan.splits().stream()
+                    .map(s -> (DataSplit) s)
+                    .mapToLong(s -> s.files().size())
+                    .sum();
         }
 
-        private DataTableScan.DataFilePlan dataFilePlan() {
+        private TableScan.Plan plan() {
             return storeTable.newScan().plan();
         }
 
@@ -224,8 +224,8 @@ public class FilesTable implements ReadonlyTable {
             }
             FilesSplit filesSplit = (FilesSplit) split;
             FileStoreTable table = filesSplit.storeTable;
-            DataTableScan.DataFilePlan dataFilePlan = 
filesSplit.dataFilePlan();
-            if (dataFilePlan == null) {
+            TableScan.Plan plan = filesSplit.plan();
+            if (plan.splits().isEmpty()) {
                 return new IteratorRecordReader<>(Collections.emptyIterator());
             }
 
@@ -260,13 +260,13 @@ public class FilesTable implements ReadonlyTable {
                                     });
                         }
                     };
-            for (DataSplit dataSplit : dataFilePlan.splits) {
+            for (Split dataSplit : plan.splits()) {
                 iteratorList.add(
                         Iterators.transform(
-                                dataSplit.files().iterator(),
+                                ((DataSplit) dataSplit).files().iterator(),
                                 file ->
                                         toRow(
-                                                dataSplit,
+                                                (DataSplit) dataSplit,
                                                 partitionConverter,
                                                 keyConverters,
                                                 file,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index c957ddf28..04ae89aa9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -44,11 +44,9 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
-import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.system.AuditLogTable;
 import org.apache.paimon.types.DataType;
@@ -423,10 +421,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 // partition 2
                                 Collections.singletonList("-D 
2|10|301|binary|varbinary")));
 
-        StreamDataTableScan scan =
-                table.newStreamScan()
-                        .withStartingScanner(new FullStartingScanner())
-                        .withFollowUpScanner(new 
InputChangelogFollowUpScanner());
+        StreamTableScan scan = table.newStreamScan();
         scan.restore(1L);
 
         Function<Integer, Void> assertNextSnapshot =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index ace15b7c3..92853ac9d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -65,23 +65,23 @@ public class StartupModeTest extends ScannerTestBase {
         initializeTestData(); // initialize 3 commits
 
         // streaming Mode
-        StreamDataTableScan dataTableScan = table.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = table.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits).isEmpty();
-        assertThat(secondPlan.splits).isEmpty();
+        assertThat(firstPlan.splits()).isEmpty();
+        assertThat(secondPlan.splits()).isEmpty();
 
         // write next data
         writeAndCommit(4, rowData(1, 10, 103L));
-        DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
-        assertThat(thirdPlan.splits)
+        TableScan.Plan thirdPlan = dataTableScan.plan();
+        assertThat(thirdPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = table.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = table.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
     }
 
@@ -91,24 +91,24 @@ public class StartupModeTest extends ScannerTestBase {
         initializeTestData(); // initialize 3 commits
 
         // streaming Mode
-        StreamDataTableScan dataTableScan = table.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = table.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits)
+        assertThat(firstPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
-        assertThat(secondPlan.splits).isEmpty();
+        assertThat(secondPlan.splits()).isEmpty();
 
         // write next data
         writeAndCommit(4, rowData(1, 10, 103L));
-        DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
-        assertThat(thirdPlan.splits)
+        TableScan.Plan thirdPlan = dataTableScan.plan();
+        assertThat(thirdPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = table.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = table.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
     }
 
@@ -129,18 +129,18 @@ public class StartupModeTest extends ScannerTestBase {
         FileStoreTable readTable = table.copy(properties);
 
         // streaming Mode
-        StreamDataTableScan dataTableScan = readTable.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = readTable.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits).isEmpty();
-        assertThat(secondPlan.splits)
+        assertThat(firstPlan.splits()).isEmpty();
+        assertThat(secondPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = readTable.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = readTable.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
     }
 
@@ -154,19 +154,19 @@ public class StartupModeTest extends ScannerTestBase {
         writeAndCommit(5, rowData(1, 10, 103L));
 
         // streaming Mode
-        StreamDataTableScan dataTableScan = table.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = table.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits)
+        assertThat(firstPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
-        assertThat(secondPlan.splits)
+        assertThat(secondPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(5).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = table.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = table.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
     }
 
@@ -178,18 +178,18 @@ public class StartupModeTest extends ScannerTestBase {
         initializeTestData(); // initialize 3 commits
 
         // streaming Mode
-        StreamDataTableScan dataTableScan = table.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = table.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits).isEmpty();
-        assertThat(secondPlan.splits)
+        assertThat(firstPlan.splits()).isEmpty();
+        assertThat(secondPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = table.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = table.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
     }
 
@@ -200,19 +200,19 @@ public class StartupModeTest extends ScannerTestBase {
         initializeTable(StartupMode.FROM_SNAPSHOT_FULL, properties);
         initializeTestData(); // initialize 3 commits
 
-        StreamDataTableScan dataTableScan = table.newStreamScan();
-        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
-        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+        StreamTableScan dataTableScan = table.newStreamScan();
+        TableScan.Plan firstPlan = dataTableScan.plan();
+        TableScan.Plan secondPlan = dataTableScan.plan();
 
-        assertThat(firstPlan.splits)
+        assertThat(firstPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
-        assertThat(secondPlan.splits)
+        assertThat(secondPlan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.DELTA).splits());
 
         // batch mode
-        BatchDataTableScan batchScan = table.newScan();
-        DataTableScan.DataFilePlan plan = batchScan.plan();
-        assertThat(plan.splits)
+        TableScan batchScan = table.newScan();
+        TableScan.Plan plan = batchScan.plan();
+        assertThat(plan.splits())
                 
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
similarity index 94%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index 062a50856..a249b2b2e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -38,15 +38,15 @@ import java.util.Map;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link StreamDataTableScan}. */
-public class StreamDataTableScanTest extends ScannerTestBase {
+/** Tests for {@link StreamTableScan}. */
+public class StreamTableScanTest extends ScannerTestBase {
 
     @Test
     public void testPlan() throws Exception {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        StreamDataTableScan scan = table.newStreamScan();
+        StreamTableScan scan = table.newStreamScan();
 
         // first call without any snapshot, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
@@ -63,7 +63,7 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         commit.commit(1, write.prepareCommit(true, 1));
 
         // first call with snapshot, should return complete records from 2nd 
commit
-        DataTableScan.DataFilePlan plan = scan.plan();
+        TableScan.Plan plan = scan.plan();
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
@@ -108,7 +108,7 @@ public class StreamDataTableScanTest extends 
ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        StreamDataTableScan scan = table.newStreamScan();
+        StreamTableScan scan = table.newStreamScan();
 
         // first call without any snapshot, should return empty plan
         assertThat(scan.plan().splits()).isEmpty();
@@ -133,7 +133,7 @@ public class StreamDataTableScanTest extends 
ScannerTestBase {
         commit.commit(2, write.prepareCommit(true, 2));
 
         // first call with snapshot, should return full compacted records from 
3rd commit
-        DataTableScan.DataFilePlan plan = scan.plan();
+        TableScan.Plan plan = scan.plan();
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
@@ -178,14 +178,14 @@ public class StreamDataTableScanTest extends 
ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         TableCommitImpl commit = table.newCommit(commitUser);
-        StreamDataTableScan scan = table.newStreamScan();
+        StreamTableScan scan = table.newStreamScan();
 
         write.write(rowData(1, 10, 100L));
         ManifestCommittable committable = new ManifestCommittable(0, 5L);
         write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
         commit.commit(committable);
 
-        DataTableScan.DataFilePlan plan = scan.plan();
+        TableScan.Plan plan = scan.plan();
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
 
@@ -203,14 +203,14 @@ public class StreamDataTableScanTest extends 
ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         TableCommitImpl commit = table.newCommit(commitUser);
-        StreamDataTableScan scan = table.newStreamScan();
+        StreamTableScan scan = table.newStreamScan();
 
         write.write(rowData(1, 10, 100L));
         ManifestCommittable committable = new ManifestCommittable(0, 5L);
         write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
         commit.commit(committable);
 
-        DataTableScan.DataFilePlan plan = scan.plan();
+        TableScan.Plan plan = scan.plan();
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
         assertThat(scan.plan().splits()).isEmpty();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
 b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
similarity index 92%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 61e9236ca..0109125f5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -31,15 +31,15 @@ import java.util.Arrays;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link BatchDataTableScan}. */
-public class BatchDataTableScanTest extends ScannerTestBase {
+/** Tests for {@link TableScan}. */
+public class TableScanTest extends ScannerTestBase {
 
     @Test
     public void testPlan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        BatchDataTableScan scan = table.newScan();
+        TableScan scan = table.newScan();
 
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
@@ -53,7 +53,7 @@ public class BatchDataTableScanTest extends ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
-        DataTableScan.DataFilePlan plan = scan.plan();
+        TableScan.Plan plan = scan.plan();
         assertThat(getResult(table.newRead(), plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index bc3ed34d3..406be8279 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -79,7 +79,7 @@ public class CompactionChangelogFollowUpScannerTest extends 
ScannerTestBase {
         snapshot = snapshotManager.snapshot(3);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.scan(3, snapshotSplitReader);
+        TableScan.Plan plan = scanner.scan(3, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200", 
"+I 1|30|300"));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 3ee8d0d67..72c0e62f8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -23,8 +23,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.BucketsTable;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
@@ -84,7 +84,7 @@ public class ContinuousCompactorFollowUpScannerTest extends 
ScannerTestBase {
         Snapshot snapshot = snapshotManager.snapshot(1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+        TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
index 116b17702..a70d8f01e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -21,8 +21,8 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -61,7 +61,7 @@ public class DeltaFollowUpScannerTest extends ScannerTestBase 
{
         Snapshot snapshot = snapshotManager.snapshot(1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+        TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", 
"+I 1|40|400"));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 98a0bd20b..a22d3a16f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -64,7 +64,7 @@ public class InputChangelogFollowUpScannerTest extends 
ScannerTestBase {
         Snapshot snapshot = snapshotManager.snapshot(1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+        TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", 
"+I 1|40|400"));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 2beddf707..306869692 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -27,7 +27,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableStreamingReader;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.TypeUtils;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
similarity index 79%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index d1f2e1dff..75e5fadb1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -16,15 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.table.source;
+package org.apache.paimon.flink.lookup;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
@@ -34,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.IntUnaryOperator;
@@ -46,10 +53,20 @@ public class TableStreamingReader {
 
     private final ReadBuilder readBuilder;
     @Nullable private final PredicateFilter recordFilter;
-    private final StreamDataTableScan scan;
+    private final StreamTableScan scan;
 
     public TableStreamingReader(Table table, int[] projection, @Nullable 
Predicate predicate) {
+        if (CoreOptions.fromMap(table.options()).startupMode()
+                != CoreOptions.StartupMode.COMPACTED_FULL) {
+            table =
+                    table.copy(
+                            Collections.singletonMap(
+                                    CoreOptions.SCAN_MODE.key(),
+                                    
CoreOptions.StartupMode.LATEST_FULL.toString()));
+        }
+
         this.readBuilder = 
table.newReadBuilder().withProjection(projection).withFilter(predicate);
+        scan = readBuilder.newStreamScan();
 
         if (predicate != null) {
             List<String> fieldNames = table.rowType().getFieldNames();
@@ -75,9 +92,6 @@ public class TableStreamingReader {
         } else {
             recordFilter = null;
         }
-
-        scan = (StreamDataTableScan) readBuilder.newStreamScan();
-        scan.withSnapshotStarting();
     }
 
     public Iterator<InternalRow> nextBatch() throws Exception {
@@ -89,11 +103,11 @@ public class TableStreamingReader {
         }
     }
 
-    private Iterator<InternalRow> read(DataFilePlan plan) throws IOException {
+    private Iterator<InternalRow> read(TableScan.Plan plan) throws IOException 
{
         TableRead read = readBuilder.newRead();
 
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
-        for (DataSplit split : plan.splits) {
+        for (Split split : plan.splits()) {
             readers.add(() -> read.createReader(split));
         }
         Iterator<InternalRow> iterator =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index f49b03309..349c7d5cd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -18,9 +18,9 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
-import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
@@ -39,6 +39,7 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -92,14 +93,15 @@ public class CompactorSourceBuilder {
                                     .toArray(Predicate[]::new));
         }
 
-        ReadBuilder readBuilder = 
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
         if (isContinuous) {
+            bucketsTable = bucketsTable.copy(streamingCompactOptions());
             return new ContinuousFileStoreSource(
-                    readBuilder,
+                    
bucketsTable.newReadBuilder().withFilter(partitionPredicate),
                     bucketsTable.options(),
-                    null,
-                    TableScanUtils.compactStreamScanFactory());
+                    null);
         } else {
+            bucketsTable = bucketsTable.copy(batchCompactOptions());
+            ReadBuilder readBuilder = 
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
             List<Split> splits = readBuilder.newScan().plan().splits();
             return new StaticFileStoreSource(
                     readBuilder,
@@ -108,8 +110,7 @@ public class CompactorSourceBuilder {
                             .coreOptions()
                             .toConfiguration()
                             
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
-                    splits,
-                    TableScanUtils.compactBatchScanFactory());
+                    splits);
         }
     }
 
@@ -126,4 +127,25 @@ public class CompactorSourceBuilder {
                 tableIdentifier + "-compact-source",
                 
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
     }
+
+    private Map<String, String> streamingCompactOptions() {
+        // set 'streaming-compact' and remove 'scan.bounded.watermark'
+        return new HashMap<String, String>() {
+            {
+                put(CoreOptions.STREAMING_COMPACT.key(), "true");
+                put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), null);
+            }
+        };
+    }
+
+    private Map<String, String> batchCompactOptions() {
+        // batch compactor source will compact all current files
+        return new HashMap<String, String>() {
+            {
+                put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+                put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+                put(CoreOptions.SCAN_MODE.key(), 
CoreOptions.StartupMode.LATEST_FULL.toString());
+            }
+        };
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index e9c14ca15..54f69ca60 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.Boundedness;
@@ -44,21 +44,11 @@ public class ContinuousFileStoreSource extends FlinkSource {
     private static final long serialVersionUID = 3L;
 
     private final Map<String, String> options;
-    private final TableScanUtils.StreamTableScanFactory scanFactory;
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
-        this(readBuilder, options, limit, 
TableScanUtils.defaultStreamScanFactory());
-    }
-
-    public ContinuousFileStoreSource(
-            ReadBuilder readBuilder,
-            Map<String, String> options,
-            @Nullable Long limit,
-            TableScanUtils.StreamTableScanFactory scanFactory) {
         super(readBuilder, limit);
         this.options = options;
-        this.scanFactory = scanFactory;
     }
 
     @Override
@@ -77,6 +67,8 @@ public class ContinuousFileStoreSource extends FlinkSource {
             splits = checkpoint.splits();
         }
         CoreOptions coreOptions = CoreOptions.fromMap(options);
+        StreamTableScan scan = readBuilder.newStreamScan();
+        scan.restore(nextSnapshotId);
         return new ContinuousFileSplitEnumerator(
                 context,
                 splits,
@@ -85,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 coreOptions
                         .toConfiguration()
                         
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
-                scanFactory.create(readBuilder, nextSnapshotId));
+                scan);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index c0fa8c9e0..5018130d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 
@@ -37,7 +36,6 @@ public class StaticFileStoreSource extends FlinkSource {
     private static final long serialVersionUID = 3L;
 
     private final int splitBatchSize;
-    private final TableScanUtils.TableScanFactory scanFactory;
     private final List<Split> splitList;
 
     public StaticFileStoreSource(
@@ -45,19 +43,9 @@ public class StaticFileStoreSource extends FlinkSource {
             @Nullable Long limit,
             int splitBatchSize,
             List<Split> splitList) {
-        this(readBuilder, limit, splitBatchSize, splitList, 
ReadBuilder::newScan);
-    }
-
-    public StaticFileStoreSource(
-            ReadBuilder readBuilder,
-            @Nullable Long limit,
-            int splitBatchSize,
-            List<Split> splitList,
-            TableScanUtils.TableScanFactory scanFactory) {
         super(readBuilder, limit);
         this.splitBatchSize = splitBatchSize;
         this.splitList = splitList;
-        this.scanFactory = scanFactory;
     }
 
     @Override
@@ -79,7 +67,7 @@ public class StaticFileStoreSource extends FlinkSource {
         if (null != splitList) {
             return splitGenerator.createSplits(splitList);
         } else {
-            return 
splitGenerator.createSplits(scanFactory.create(readBuilder).plan());
+            return splitGenerator.createSplits(readBuilder.newScan().plan());
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 9ab22f00c..7d4911d3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -20,22 +20,11 @@ package org.apache.paimon.flink.utils;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
-import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
 import java.util.HashMap;
 
-/** Utility methods for {@link TableScan}, such as validating and creating. */
+/** Utility methods for {@link TableScan}, such as validating. */
 public class TableScanUtils {
 
     public static void streamingReadingValidate(Table table) {
@@ -60,47 +49,4 @@ public class TableScanUtils {
             }
         }
     }
-
-    // ------------------------------------------------------------------------
-    // TableScan factories
-    // ------------------------------------------------------------------------
-
-    /** Factory to create batch {@link TableScan}. */
-    public interface TableScanFactory extends Serializable {
-        TableScan create(ReadBuilder readBuilder);
-    }
-
-    /** Factory to create {@link StreamTableScan}. */
-    public interface StreamTableScanFactory extends Serializable {
-
-        StreamTableScan create(ReadBuilder readBuilder, @Nullable Long 
nextSnapshotId);
-    }
-
-    public static StreamTableScanFactory defaultStreamScanFactory() {
-        return (builder, nextSnapshotId) -> {
-            StreamTableScan scan = builder.newStreamScan();
-            scan.restore(nextSnapshotId);
-            return scan;
-        };
-    }
-
-    public static TableScanFactory compactBatchScanFactory() {
-        return readBuilder -> {
-            BatchDataTableScan scan = (BatchDataTableScan) 
readBuilder.newScan();
-            // static compactor source will compact all current files
-            scan.withStartingScanner(new FullStartingScanner());
-            return scan;
-        };
-    }
-
-    public static StreamTableScanFactory compactStreamScanFactory() {
-        return (readBuilder, nextSnapshotId) -> {
-            StreamDataTableScan scan = (StreamDataTableScan) 
readBuilder.newStreamScan();
-            scan.withStartingScanner(new ContinuousCompactorStartingScanner())
-                    .withFollowUpScanner(new 
ContinuousCompactorFollowUpScanner())
-                    .withBoundedChecker(BoundedChecker.neverEnd());
-            scan.restore(nextSnapshotId);
-            return scan;
-        };
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index e6f2e85a9..db4e107fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -24,8 +24,8 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -146,8 +146,8 @@ public class CompactActionITCase extends ActionITCaseBase {
         Assertions.assertEquals(Snapshot.CommitKind.APPEND, 
snapshot.commitKind());
 
         // no full compaction has happened, so plan should be empty
-        StreamDataTableScan scan = table.newStreamScan();
-        DataTableScan.DataFilePlan plan = scan.plan();
+        StreamTableScan scan = table.newStreamScan();
+        TableScan.Plan plan = scan.plan();
         Assertions.assertTrue(plan.splits().isEmpty());
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -209,15 +209,14 @@ public class CompactActionITCase extends ActionITCaseBase 
{
     }
 
     private void validateResult(
-            FileStoreTable table, StreamDataTableScan scan, List<String> 
expected, long timeout)
+            FileStoreTable table, StreamTableScan scan, List<String> expected, 
long timeout)
             throws Exception {
         List<String> actual = new ArrayList<>();
         long start = System.currentTimeMillis();
         while (actual.size() != expected.size()) {
-            DataTableScan.DataFilePlan plan = scan.plan();
-            if (plan != null) {
-                actual.addAll(getResult(table.newRead(), plan.splits(), 
ROW_TYPE));
-            }
+            TableScan.Plan plan = scan.plan();
+            actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
+
             if (System.currentTimeMillis() - start > timeout) {
                 break;
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 6587b5663..65bcf6ba7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -62,7 +62,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
-        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newScan().plan();
         assertThat(plan.splits().size()).isEqualTo(2);
         List<String> actual = getResult(table.newRead(), plan.splits(), 
ROW_TYPE);
 
@@ -109,7 +109,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
-        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newScan().plan();
         assertThat(plan.splits().size()).isEqualTo(2);
         List<String> actual = getResult(table.newRead(), plan.splits(), 
ROW_TYPE);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index 8950cc377..054ab27fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -65,7 +65,7 @@ public abstract class CommitterOperatorTestBase {
         List<String> actual = new ArrayList<>();
         table.newScan()
                 .plan()
-                .splits
+                .splits()
                 .forEach(
                         s -> {
                             try {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index fda4cdd1e..3739f7bc0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -33,7 +33,8 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -116,15 +117,16 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
         assertEquals(3, snapshot.id());
         assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
 
-        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newScan().plan();
         assertEquals(3, plan.splits().size());
-        for (DataSplit split : plan.splits) {
-            if (split.partition().getInt(1) == 15) {
+        for (Split split : plan.splits()) {
+            DataSplit dataSplit = (DataSplit) split;
+            if (dataSplit.partition().getInt(1) == 15) {
                 // compacted
-                assertEquals(1, split.files().size());
+                assertEquals(1, dataSplit.files().size());
             } else {
                 // not compacted
-                assertEquals(2, split.files().size());
+                assertEquals(2, dataSplit.files().size());
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index 2abeb87e4..bc600a86f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -35,7 +35,7 @@ import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -191,7 +191,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
         TableSchema schema = schemaManager.latest().get();
 
         Map<Integer, Map<String, String>> actual = new HashMap<>();
-        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newScan().plan();
         try (RecordReaderIterator<InternalRow> it =
                 new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
             while (it.hasNext()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index d070a7ab4..05237690e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -43,6 +44,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -77,9 +80,14 @@ public class CompactorSourceITCase extends AbstractTestBase {
         commitUser = UUID.randomUUID().toString();
     }
 
-    @Test
-    public void testBatchRead() throws Exception {
+    @ParameterizedTest(name = "defaultOptions = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testBatchRead(boolean defaultOptions) throws Exception {
         FileStoreTable table = createFileStoreTable();
+        if (!defaultOptions) {
+            // change options to test whether CompactorSourceBuilder work 
normally
+            table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
+        }
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
@@ -115,9 +123,20 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
         it.close();
     }
 
-    @Test
-    public void testStreamingRead() throws Exception {
+    @ParameterizedTest(name = "defaultOptions = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testStreamingRead(boolean defaultOptions) throws Exception {
         FileStoreTable table = createFileStoreTable();
+        if (!defaultOptions) {
+            // change options to test whether CompactorSourceBuilder work 
normally
+            Map<String, String> dynamicOptions = new HashMap<>();
+            dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+            dynamicOptions.put(
+                    CoreOptions.CHANGELOG_PRODUCER.key(),
+                    CoreOptions.ChangelogProducer.NONE.toString());
+            dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
+            table = table.copy(dynamicOptions);
+        }
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 1c9ea6b71..2ecd7f54e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -19,16 +19,11 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
 import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
@@ -193,8 +188,8 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        Queue<DataFilePlan> results = new LinkedBlockingQueue<>();
-        StreamDataTableScan scan = new MockScan(results);
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
                         .setSplitEnumeratorContext(context)
@@ -268,7 +263,7 @@ public class ContinuousFileSplitEnumeratorTest {
         private long discoveryInterval = Long.MAX_VALUE;
 
         private int splitBatchSize = 10;
-        private StreamDataTableScan scan;
+        private StreamTableScan scan;
 
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -291,7 +286,7 @@ public class ContinuousFileSplitEnumeratorTest {
             return this;
         }
 
-        public Builder setScan(StreamDataTableScan scan) {
+        public Builder setScan(StreamTableScan scan) {
             this.scan = scan;
             return this;
         }
@@ -302,67 +297,22 @@ public class ContinuousFileSplitEnumeratorTest {
         }
     }
 
-    private static class MockScan implements StreamDataTableScan {
-        private final Queue<DataFilePlan> results;
+    private static class MockScan implements StreamTableScan {
+        private final Queue<Plan> results;
 
-        public MockScan(Queue<DataFilePlan> results) {
+        public MockScan(Queue<Plan> results) {
             this.results = results;
         }
 
         @Override
-        public StreamDataTableScan withKind(ScanKind kind) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withSnapshot(long snapshotId) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withLevelFilter(Filter<Integer> 
levelFilter) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withFilter(Predicate predicate) {
-            return null;
-        }
-
-        @Override
-        public DataFilePlan plan() {
-            DataFilePlan plan = results.poll();
+        public Plan plan() {
+            Plan plan = results.poll();
             if (plan == null) {
                 throw new EndOfScanException();
             }
             return plan;
         }
 
-        @Override
-        public boolean supportStreamingReadOverwrite() {
-            return false;
-        }
-
-        @Override
-        public StreamDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withFollowUpScanner(FollowUpScanner 
followUpScanner) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withBoundedChecker(BoundedChecker 
boundedChecker) {
-            return null;
-        }
-
-        @Override
-        public StreamDataTableScan withSnapshotStarting() {
-            return null;
-        }
-
         @Override
         public Long checkpoint() {
             return null;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 45c53d18c..9837f1469 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,9 +23,9 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.stats.StatsTestUtils;
+import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
+import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
 
 import org.junit.jupiter.api.Test;
 
@@ -74,13 +74,13 @@ public class FileStoreSourceSplitGeneratorTest {
                     }
                 };
         List<DataSplit> scanSplits =
-                SnapshotSplitReader.generateSplits(
+                SnapshotSplitReaderImpl.generateSplits(
                         1L,
                         false,
                         false,
                         Collections::singletonList,
                         
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
-        DataTableScan.DataFilePlan tableScanPlan = new 
DataTableScan.DataFilePlan(scanSplits);
+        DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);
 
         List<FileStoreSourceSplit> splits =
                 new 
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 91e778b47..411ea0d03 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,7 +28,8 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -53,10 +54,10 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
         FileStoreTable table = createFileStoreTable(jobConf);
-        DataTableScan scan = table.newScan();
+        InnerTableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
-        return scan.plan().splits.stream()
-                .map(split -> new 
PaimonInputSplit(table.location().toString(), split))
+        return scan.plan().splits().stream()
+                .map(split -> new 
PaimonInputSplit(table.location().toString(), (DataSplit) split))
                 .toArray(PaimonInputSplit[]::new);
     }
 

Reply via email to