This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b9527aba [FLINK-29736] Abstract a table interface for both data and 
metadata tables
b9527aba is described below

commit b9527abac18a7c1f4eba13dadfb07227ba23f4b8
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 25 21:53:58 2022 +0800

    [FLINK-29736] Abstract a table interface for both data and metadata tables
    
    This closes #330
---
 .../store/connector/sink/StoreCompactOperator.java |   8 +-
 .../source/ContinuousFileSplitEnumerator.java      |   4 +-
 .../store/connector/source/FileStoreSource.java    |   6 +-
 .../connector/source/FileStoreSourceSplit.java     |  10 +-
 .../source/FileStoreSourceSplitGenerator.java      |   9 +-
 .../source/FileStoreSourceSplitSerializer.java     |   5 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  10 +-
 .../source/FileStoreSourceSplitSerializerTest.java |   4 +-
 .../flink/table/store/file/catalog/Catalog.java    |   6 +-
 .../file/operation/AppendOnlyFileStoreRead.java    |   4 +-
 .../file/operation/AppendOnlyFileStoreWrite.java   |   4 +-
 .../table/store/file/operation/FileStoreRead.java  |   4 +-
 .../file/operation/KeyValueFileStoreRead.java      |   9 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   9 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   6 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   6 +-
 .../flink/table/store/table/FileStoreTable.java    |  28 +++--
 .../SupportsPartition.java}                        |  21 +---
 .../SupportsWrite.java}                            |  22 ++--
 .../table/{FileStoreTable.java => Table.java}      |  26 +----
 .../table/source/{Split.java => DataSplit.java}    |  47 ++++++--
 .../source/{TableScan.java => DataTableScan.java}  |  46 ++++----
 .../store/table/source/KeyValueTableRead.java      |   2 +-
 .../store/table/source/SnapshotEnumerator.java     |  10 +-
 .../flink/table/store/table/source/Split.java      |  88 +-------------
 .../flink/table/store/table/source/TableScan.java  | 130 ++-------------------
 .../store/table/source/TableStreamingReader.java   |   8 +-
 .../flink/table/store/file/TestFileStore.java      |   4 +-
 .../file/operation/KeyValueFileStoreReadTest.java  |   4 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |  19 +--
 .../ChangelogValueCountFileStoreTableTest.java     |  14 +--
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  27 ++---
 .../table/store/table/FileStoreTableTestBase.java  |  15 ++-
 .../table/store/table/SchemaEvolutionTest.java     |   2 +-
 .../table/store/table/WritePreemptMemoryTest.java  |   2 +-
 .../flink/table/store/table/source/SplitTest.java  |   6 +-
 .../table/store/mapred/TableStoreInputFormat.java  |   4 +-
 .../table/store/mapred/TableStoreInputSplit.java   |  10 +-
 .../store/mapred/TableStoreInputSplitTest.java     |   4 +-
 .../store/mapred/TableStoreRecordReaderTest.java   |   4 +-
 .../table/store/spark/SparkInputPartition.java     |  18 +--
 .../table/store/spark/SparkReaderFactory.java      |   9 +-
 .../apache/flink/table/store/spark/SparkScan.java  |  22 ++--
 .../flink/table/store/spark/SparkScanBuilder.java  |  10 +-
 .../flink/table/store/spark/SparkSource.java       |   4 +-
 .../apache/flink/table/store/spark/SparkTable.java |  26 +++--
 .../table/store/spark/SparkDataSourceReader.java   |  19 ++-
 .../table/store/spark/SparkInputPartition.java     |  25 +---
 .../flink/table/store/spark/SparkSource.java       |   5 +-
 49 files changed, 289 insertions(+), 496 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index a703f3fd..e26b0a98 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.table.store.file.predicate.PredicateConverter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.types.logical.RowType;
@@ -75,9 +76,10 @@ public class StoreCompactOperator extends 
PrepareCommitOperator {
         int task = getRuntimeContext().getIndexOfThisSubtask();
         int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
 
-        for (Split split : scan.plan().splits) {
-            BinaryRowData partition = split.partition();
-            int bucket = split.bucket();
+        for (Split split : scan.plan().splits()) {
+            DataSplit dataSplit = (DataSplit) split;
+            BinaryRowData partition = dataSplit.partition();
+            int bucket = dataSplit.bucket();
             if (Math.abs(Objects.hash(partition, bucket)) % numTask != task) {
                 continue;
             }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 255392b0..14e84dd2 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import 
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
-import org.apache.flink.table.store.table.source.TableScan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class ContinuousFileSplitEnumerator
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Path location,
-            TableScan scan,
+            DataTableScan scan,
             CoreOptions.ChangelogProducer changelogProducer,
             Collection<FileStoreSourceSplit> remainSplits,
             long currentSnapshotId,
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 1df4028d..2aff07c2 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -29,8 +29,8 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
 
 import javax.annotation.Nullable;
 
@@ -105,7 +105,7 @@ public class FileStoreSource
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         SnapshotManager snapshotManager = table.snapshotManager();
-        TableScan scan = table.newScan();
+        DataTableScan scan = table.newScan();
         if (predicate != null) {
             scan.withFilter(predicate);
         }
@@ -121,7 +121,7 @@ public class FileStoreSource
                 snapshotId = snapshotManager.latestSnapshotId();
                 splits = new ArrayList<>();
             } else {
-                TableScan.Plan plan = scan.plan();
+                DataTableScan.DataFilePlan plan = scan.plan();
                 snapshotId = plan.snapshotId;
                 splits = new 
FileStoreSourceSplitGenerator().createSplits(plan);
             }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
index 4ff027ea..2356bab8 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import java.util.Objects;
 
@@ -29,21 +29,21 @@ public class FileStoreSourceSplit implements SourceSplit {
     /** The unique ID of the split. Unique within the scope of this source. */
     private final String id;
 
-    private final Split split;
+    private final DataSplit split;
 
     private final long recordsToSkip;
 
-    public FileStoreSourceSplit(String id, Split split) {
+    public FileStoreSourceSplit(String id, DataSplit split) {
         this(id, split, 0);
     }
 
-    public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
+    public FileStoreSourceSplit(String id, DataSplit split, long 
recordsToSkip) {
         this.id = id;
         this.split = split;
         this.recordsToSkip = recordsToSkip;
     }
 
-    public Split split() {
+    public DataSplit split() {
         return split;
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index dc6b9cd3..5ffc6725 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.store.connector.source;
 
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -35,9 +36,9 @@ public class FileStoreSourceSplitGenerator {
      */
     private final char[] currentId = "0000000000".toCharArray();
 
-    public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
-        return plan.splits.stream()
-                .map(s -> new FileStoreSourceSplit(getNextId(), s))
+    public List<FileStoreSourceSplit> createSplits(DataTableScan.Plan plan) {
+        return plan.splits().stream()
+                .map(s -> new FileStoreSourceSplit(getNextId(), (DataSplit) s))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 796373b6..4c0d6d2f 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -48,6 +48,7 @@ public class FileStoreSourceSplitSerializer
     @Override
     public FileStoreSourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
-        return new FileStoreSourceSplit(view.readUTF(), 
Split.deserialize(view), view.readLong());
+        return new FileStoreSourceSplit(
+                view.readUTF(), DataSplit.deserialize(view), view.readLong());
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 955c5640..4e645045 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -75,10 +75,10 @@ public class FileStoreSourceSplitGeneratorTest {
                                 makeEntry(6, 1, "f14"));
                     }
                 };
-        List<Split> scanSplits =
-                TableScan.generateSplits(
+        List<DataSplit> scanSplits =
+                DataTableScan.generateSplits(
                         false, Collections::singletonList, 
plan.groupByPartFiles());
-        TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
+        DataTableScan.DataFilePlan tableScanPlan = new 
DataTableScan.DataFilePlan(1L, scanSplits);
 
         List<FileStoreSourceSplit> splits =
                 new 
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 760cf426..3ecc9983 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import org.junit.jupiter.api.Test;
 
@@ -118,7 +118,7 @@ public class FileStoreSourceSplitSerializerTest {
             boolean isIncremental,
             long recordsToSkip) {
         return new FileStoreSourceSplit(
-                id, new Split(partition, bucket, files, isIncremental), 
recordsToSkip);
+                id, new DataSplit(partition, bucket, files, isIncremental), 
recordsToSkip);
     }
 
     private static FileStoreSourceSplit 
serializeAndDeserialize(FileStoreSourceSplit split)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 2d3887c8..a84bb22f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.Table;
 
 import java.util.List;
 import java.util.Optional;
@@ -110,13 +110,13 @@ public interface Catalog extends AutoCloseable {
     TableSchema getTableSchema(ObjectPath tablePath) throws 
TableNotExistException;
 
     /**
-     * Return a {@link FileStoreTable} identified by the given {@link 
ObjectPath}.
+     * Return a {@link Table} identified by the given {@link ObjectPath}.
      *
      * @param tablePath Path of the table
      * @return The requested table
      * @throws TableNotExistException if the target does not exist
      */
-    default FileStoreTable getTable(ObjectPath tablePath) throws 
TableNotExistException {
+    default Table getTable(ObjectPath tablePath) throws TableNotExistException 
{
         TableSchema tableSchema = getTableSchema(tablePath);
         return FileStoreTableFactory.create(getTableLocation(tablePath), 
tableSchema);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 29ea31be..cf1b35a9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -82,7 +82,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<RowData> {
     }
 
     @Override
-    public RecordReader<RowData> createReader(Split split) throws IOException {
+    public RecordReader<RowData> createReader(DataSplit split) throws 
IOException {
         BulkFormat<RowData, FileSourceSplit> readerFactory =
                 fileFormat.createReaderFactory(rowType, projection, filters);
         DataFilePathFactory dataFilePathFactory =
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 627746c2..41271012 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
@@ -139,7 +139,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
                             new 
LongCounter(toCompact.get(0).minSequenceNumber()));
             rewriter.write(
                     new RecordReaderIterator<>(
-                            read.createReader(new Split(partition, bucket, 
toCompact, false))));
+                            read.createReader(new DataSplit(partition, bucket, 
toCompact, false))));
             rewriter.close();
             return rewriter.result();
         };
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index f540f0cb..adb59879 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import java.io.IOException;
 
@@ -34,5 +34,5 @@ public interface FileStoreRead<T> {
     FileStoreRead<T> withFilter(Predicate predicate);
 
     /** Create a {@link RecordReader} from split. */
-    RecordReader<T> createReader(Split split) throws IOException;
+    RecordReader<T> createReader(DataSplit split) throws IOException;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 63ebb3ed..6914fbcb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
@@ -125,7 +125,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     }
 
     @Override
-    public RecordReader<KeyValue> createReader(Split split) throws IOException 
{
+    public RecordReader<KeyValue> createReader(DataSplit split) throws 
IOException {
         if (split.isIncremental()) {
             KeyValueFileReaderFactory readerFactory =
                     readerFactoryBuilder.build(
@@ -154,7 +154,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         }
     }
 
-    private List<ConcatRecordReader.ReaderSupplier<KeyValue>> 
createSectionReaders(Split split) {
+    private List<ConcatRecordReader.ReaderSupplier<KeyValue>> 
createSectionReaders(
+            DataSplit split) {
         List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new 
ArrayList<>();
         MergeFunction<KeyValue> mergeFunc = mergeFunction.copy();
         for (List<SortedRun> section :
@@ -169,7 +170,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         return sectionReaders;
     }
 
-    private KeyValueFileReaderFactory createReaderFactory(Split split, boolean 
overlapped) {
+    private KeyValueFileReaderFactory createReaderFactory(DataSplit split, 
boolean overlapped) {
         return readerFactoryBuilder.build(
                 split.partition(),
                 split.bucket(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index e1eaf354..4d638fb7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -34,10 +34,11 @@ import 
org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
 import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
@@ -63,9 +64,9 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan() {
+    public DataTableScan newScan() {
         AppendOnlyFileStoreScan scan = store.newScan();
-        return new TableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new AppendOnlySplitGenerator(
@@ -97,7 +98,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
             @Override
             public RecordReader<RowData> createReader(Split split) throws 
IOException {
-                return read.createReader(split);
+                return read.createReader((DataSplit) split);
             }
         };
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 5479dd1e..ebde8da4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -36,11 +36,11 @@ import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -74,9 +74,9 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan() {
+    public DataTableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
-        return new TableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 087f28af..1a0a8263 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -39,11 +39,11 @@ import 
org.apache.flink.table.store.table.sink.SequenceGenerator;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
 import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -127,9 +127,9 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan() {
+    public DataTableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
-        return new TableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 595a8524..d99019c3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -24,28 +24,42 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.types.logical.RowType;
 
-import java.io.Serializable;
+import java.util.List;
 
 /**
  * An abstraction layer above {@link 
org.apache.flink.table.store.file.FileStore} to provide reading
  * and writing of {@link org.apache.flink.table.data.RowData}.
  */
-public interface FileStoreTable extends Serializable {
+public interface FileStoreTable extends Table, SupportsPartition, 
SupportsWrite {
 
     CoreOptions options();
 
     Path location();
 
+    @Override
+    default String name() {
+        return location().getName();
+    }
+
+    @Override
+    default RowType rowType() {
+        return schema().logicalRowType();
+    }
+
+    @Override
+    default List<String> partitionKeys() {
+        return schema().partitionKeys();
+    }
+
     TableSchema schema();
 
     SnapshotManager snapshotManager();
 
-    TableScan newScan();
-
-    TableRead newRead();
+    @Override
+    DataTableScan newScan();
 
     TableWrite newWrite();
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
similarity index 58%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
index f540f0cb..7d2944ff 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
@@ -16,23 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.table;
 
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import java.util.List;
 
-import java.io.IOException;
+/** An interface for {@link Table} partition support. */
+public interface SupportsPartition {
 
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
-
-    FileStoreRead<T> withFilter(Predicate predicate);
-
-    /** Create a {@link RecordReader} from split. */
-    RecordReader<T> createReader(Split split) throws IOException;
+    List<String> partitionKeys();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
similarity index 58%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index f540f0cb..97dfa5e5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -16,23 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.table;
 
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
 
-import java.io.IOException;
+/** An interface for {@link Table} write support. */
+public interface SupportsWrite {
 
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
-
-    FileStoreRead<T> withFilter(Predicate predicate);
+    TableWrite newWrite();
 
-    /** Create a {@link RecordReader} from split. */
-    RecordReader<T> createReader(Split split) throws IOException;
+    TableCommit newCommit(String user);
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
similarity index 58%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
index 595a8524..f950aaf3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
@@ -18,36 +18,20 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
 
-/**
- * An abstraction layer above {@link 
org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Serializable {
-
-    CoreOptions options();
-
-    Path location();
+/** A table provides basic abstraction for table type and table scan and table 
read. */
+public interface Table extends Serializable {
 
-    TableSchema schema();
+    String name();
 
-    SnapshotManager snapshotManager();
+    RowType rowType();
 
     TableScan newScan();
 
     TableRead newRead();
-
-    TableWrite newWrite();
-
-    TableCommit newCommit(String user);
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
similarity index 69%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
index fd0e143e..8dfeac91 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
@@ -19,26 +19,37 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.utils.SerializationUtils;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 /** Input splits. Needed by most batch computation engines. */
-public class Split {
+public class DataSplit implements Split {
 
-    private final BinaryRowData partition;
-    private final int bucket;
-    private final List<DataFileMeta> files;
-    private final boolean isIncremental;
+    private static final long serialVersionUID = 1L;
 
-    public Split(
+    private BinaryRowData partition;
+    private int bucket;
+    private List<DataFileMeta> files;
+    private boolean isIncremental;
+
+    public DataSplit(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files, 
boolean isIncremental) {
+        init(partition, bucket, files, isIncremental);
+    }
+
+    private void init(
             BinaryRowData partition, int bucket, List<DataFileMeta> files, 
boolean isIncremental) {
         this.partition = partition;
         this.bucket = bucket;
@@ -62,6 +73,15 @@ public class Split {
         return isIncremental;
     }
 
+    @Override
+    public long rowCount() {
+        long rowCount = 0;
+        for (DataFileMeta file : files) {
+            rowCount += file.rowCount();
+        }
+        return rowCount;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -70,7 +90,7 @@ public class Split {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        Split split = (Split) o;
+        DataSplit split = (DataSplit) o;
         return bucket == split.bucket
                 && Objects.equals(partition, split.partition)
                 && Objects.equals(files, split.files)
@@ -82,6 +102,15 @@ public class Split {
         return Objects.hash(partition, bucket, files, isIncremental);
     }
 
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        serialize(new DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        DataSplit split = DataSplit.deserialize(new 
DataInputViewStreamWrapper(in));
+        init(split.partition, split.bucket, split.files, split.isIncremental);
+    }
+
     public void serialize(DataOutputView out) throws IOException {
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
@@ -93,7 +122,7 @@ public class Split {
         out.writeBoolean(isIncremental);
     }
 
-    public static Split deserialize(DataInputView in) throws IOException {
+    public static DataSplit deserialize(DataInputView in) throws IOException {
         BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
         int fileNumber = in.readInt();
@@ -102,6 +131,6 @@ public class Split {
         for (int i = 0; i < fileNumber; i++) {
             files.add(dataFileSer.deserialize(in));
         }
-        return new Split(partition, bucket, files, in.readBoolean());
+        return new DataSplit(partition, bucket, files, in.readBoolean());
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
similarity index 81%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 64baa300..166c8096 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -37,7 +37,7 @@ import java.util.Optional;
 import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
-public abstract class TableScan {
+public abstract class DataTableScan implements TableScan {
 
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
@@ -45,26 +45,20 @@ public abstract class TableScan {
 
     private boolean isIncremental = false;
 
-    protected TableScan(
+    protected DataTableScan(
             FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory 
pathFactory) {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.pathFactory = pathFactory;
     }
 
-    public TableScan withSnapshot(long snapshotId) {
+    public DataTableScan withSnapshot(long snapshotId) {
         scan.withSnapshot(snapshotId);
         return this;
     }
 
-    public TableScan withFilter(List<Predicate> predicates) {
-        if (predicates == null || predicates.isEmpty()) {
-            return this;
-        }
-        return withFilter(PredicateBuilder.and(predicates));
-    }
-
-    public TableScan withFilter(Predicate predicate) {
+    @Override
+    public DataTableScan withFilter(Predicate predicate) {
         List<String> partitionKeys = tableSchema.partitionKeys();
         int[] fieldIdxToPartitionIdx =
                 tableSchema.fields().stream()
@@ -91,34 +85,35 @@ public abstract class TableScan {
         return this;
     }
 
-    public TableScan withIncremental(boolean isIncremental) {
+    public DataTableScan withIncremental(boolean isIncremental) {
         this.isIncremental = isIncremental;
         scan.withIncremental(isIncremental);
         return this;
     }
 
     @VisibleForTesting
-    public TableScan withBucket(int bucket) {
+    public DataTableScan withBucket(int bucket) {
         scan.withBucket(bucket);
         return this;
     }
 
-    public Plan plan() {
+    @Override
+    public DataFilePlan plan() {
         FileStoreScan.Plan plan = scan.plan();
-        return new Plan(plan.snapshotId(), 
generateSplits(plan.groupByPartFiles()));
+        return new DataFilePlan(plan.snapshotId(), 
generateSplits(plan.groupByPartFiles()));
     }
 
-    private List<Split> generateSplits(
+    private List<DataSplit> generateSplits(
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
         return generateSplits(isIncremental, splitGenerator(pathFactory), 
groupedDataFiles);
     }
 
     @VisibleForTesting
-    public static List<Split> generateSplits(
+    public static List<DataSplit> generateSplits(
             boolean isIncremental,
             SplitGenerator splitGenerator,
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
-        List<Split> splits = new ArrayList<>();
+        List<DataSplit> splits = new ArrayList<>();
         for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
                 groupedDataFiles.entrySet()) {
             BinaryRowData partition = entry.getKey();
@@ -127,10 +122,10 @@ public abstract class TableScan {
                 int bucket = bucketEntry.getKey();
                 if (isIncremental) {
                     // Don't split when incremental
-                    splits.add(new Split(partition, bucket, 
bucketEntry.getValue(), true));
+                    splits.add(new DataSplit(partition, bucket, 
bucketEntry.getValue(), true));
                 } else {
                     splitGenerator.split(bucketEntry.getValue()).stream()
-                            .map(files -> new Split(partition, bucket, files, 
false))
+                            .map(files -> new DataSplit(partition, bucket, 
files, false))
                             .forEach(splits::add);
                 }
             }
@@ -143,15 +138,20 @@ public abstract class TableScan {
     protected abstract void withNonPartitionFilter(Predicate predicate);
 
     /** Scanning plan containing snapshot ID and input splits. */
-    public static class Plan {
+    public static class DataFilePlan implements Plan {
 
         @Nullable public final Long snapshotId;
-        public final List<Split> splits;
+        public final List<DataSplit> splits;
 
         @VisibleForTesting
-        public Plan(@Nullable Long snapshotId, List<Split> splits) {
+        public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits) 
{
             this.snapshotId = snapshotId;
             this.splits = splits;
         }
+
+        @Override
+        public List<Split> splits() {
+            return (List) splits;
+        }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index c3939a4d..8a8465c4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -40,7 +40,7 @@ public abstract class KeyValueTableRead implements TableRead {
 
     @Override
     public RecordReader<RowData> createReader(Split split) throws IOException {
-        return new RowDataRecordReader(read.createReader(split));
+        return new RowDataRecordReader(read.createReader((DataSplit) split));
     }
 
     protected abstract RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
index 40ebf1ee..edbaa1bc 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -36,14 +36,14 @@ public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.Enumerato
     private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotEnumerator.class);
 
     private final SnapshotManager snapshotManager;
-    private final TableScan scan;
+    private final DataTableScan scan;
     private final CoreOptions.ChangelogProducer changelogProducer;
 
     private long nextSnapshotId;
 
     public SnapshotEnumerator(
             Path tablePath,
-            TableScan scan,
+            DataTableScan scan,
             CoreOptions.ChangelogProducer changelogProducer,
             long currentSnapshot) {
         this.snapshotManager = new SnapshotManager(tablePath);
@@ -85,7 +85,7 @@ public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.Enumerato
                 continue;
             }
 
-            TableScan.Plan plan = scan.withSnapshot(nextSnapshotId).plan();
+            DataTableScan.DataFilePlan plan = 
scan.withSnapshot(nextSnapshotId).plan();
             EnumeratorResult result = new EnumeratorResult(nextSnapshotId, 
plan);
             LOG.debug("Find snapshot id {}.", nextSnapshotId);
 
@@ -99,9 +99,9 @@ public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.Enumerato
 
         public final long snapshotId;
 
-        public final TableScan.Plan plan;
+        public final DataTableScan.DataFilePlan plan;
 
-        private EnumeratorResult(long snapshotId, TableScan.Plan plan) {
+        private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan 
plan) {
             this.snapshotId = snapshotId;
             this.plan = plan;
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
index fd0e143e..978756d9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -18,90 +18,10 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.SerializationUtils;
+import java.io.Serializable;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+/** An input split for reading. */
+public interface Split extends Serializable {
 
-/** Input splits. Needed by most batch computation engines. */
-public class Split {
-
-    private final BinaryRowData partition;
-    private final int bucket;
-    private final List<DataFileMeta> files;
-    private final boolean isIncremental;
-
-    public Split(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files, 
boolean isIncremental) {
-        this.partition = partition;
-        this.bucket = bucket;
-        this.files = files;
-        this.isIncremental = isIncremental;
-    }
-
-    public BinaryRowData partition() {
-        return partition;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    public List<DataFileMeta> files() {
-        return files;
-    }
-
-    public boolean isIncremental() {
-        return isIncremental;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Split split = (Split) o;
-        return bucket == split.bucket
-                && Objects.equals(partition, split.partition)
-                && Objects.equals(files, split.files)
-                && isIncremental == split.isIncremental;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(partition, bucket, files, isIncremental);
-    }
-
-    public void serialize(DataOutputView out) throws IOException {
-        SerializationUtils.serializeBinaryRow(partition, out);
-        out.writeInt(bucket);
-        out.writeInt(files.size());
-        DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
-        for (DataFileMeta file : files) {
-            dataFileSer.serialize(file, out);
-        }
-        out.writeBoolean(isIncremental);
-    }
-
-    public static Split deserialize(DataInputView in) throws IOException {
-        BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
-        int bucket = in.readInt();
-        int fileNumber = in.readInt();
-        List<DataFileMeta> files = new ArrayList<>(fileNumber);
-        DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
-        for (int i = 0; i < fileNumber; i++) {
-            files.add(dataFileSer.deserialize(in));
-        }
-        return new Split(partition, bucket, files, in.readBoolean());
-    }
+    long rowCount();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 64baa300..68111fc2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -18,140 +18,28 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.table.Table;
 
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
-
-/** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
-public abstract class TableScan {
-
-    private final FileStoreScan scan;
-    private final TableSchema tableSchema;
-    private final FileStorePathFactory pathFactory;
 
-    private boolean isIncremental = false;
-
-    protected TableScan(
-            FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory 
pathFactory) {
-        this.scan = scan;
-        this.tableSchema = tableSchema;
-        this.pathFactory = pathFactory;
-    }
-
-    public TableScan withSnapshot(long snapshotId) {
-        scan.withSnapshot(snapshotId);
-        return this;
-    }
+/** A scan of {@link Table} to generate {@link Split} splits. */
+public interface TableScan {
 
-    public TableScan withFilter(List<Predicate> predicates) {
+    default TableScan withFilter(List<Predicate> predicates) {
         if (predicates == null || predicates.isEmpty()) {
             return this;
         }
         return withFilter(PredicateBuilder.and(predicates));
     }
 
-    public TableScan withFilter(Predicate predicate) {
-        List<String> partitionKeys = tableSchema.partitionKeys();
-        int[] fieldIdxToPartitionIdx =
-                tableSchema.fields().stream()
-                        .mapToInt(f -> partitionKeys.indexOf(f.name()))
-                        .toArray();
+    TableScan withFilter(Predicate predicate);
 
-        List<Predicate> partitionFilters = new ArrayList<>();
-        List<Predicate> nonPartitionFilters = new ArrayList<>();
-        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-            Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
-            if (mapped.isPresent()) {
-                partitionFilters.add(mapped.get());
-            } else {
-                nonPartitionFilters.add(p);
-            }
-        }
-
-        if (partitionFilters.size() > 0) {
-            scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
-        }
-        if (nonPartitionFilters.size() > 0) {
-            withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
-        }
-        return this;
-    }
-
-    public TableScan withIncremental(boolean isIncremental) {
-        this.isIncremental = isIncremental;
-        scan.withIncremental(isIncremental);
-        return this;
-    }
+    Plan plan();
 
-    @VisibleForTesting
-    public TableScan withBucket(int bucket) {
-        scan.withBucket(bucket);
-        return this;
-    }
-
-    public Plan plan() {
-        FileStoreScan.Plan plan = scan.plan();
-        return new Plan(plan.snapshotId(), 
generateSplits(plan.groupByPartFiles()));
-    }
-
-    private List<Split> generateSplits(
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
-        return generateSplits(isIncremental, splitGenerator(pathFactory), 
groupedDataFiles);
-    }
-
-    @VisibleForTesting
-    public static List<Split> generateSplits(
-            boolean isIncremental,
-            SplitGenerator splitGenerator,
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
-        List<Split> splits = new ArrayList<>();
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
-                groupedDataFiles.entrySet()) {
-            BinaryRowData partition = entry.getKey();
-            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
-                int bucket = bucketEntry.getKey();
-                if (isIncremental) {
-                    // Don't split when incremental
-                    splits.add(new Split(partition, bucket, 
bucketEntry.getValue(), true));
-                } else {
-                    splitGenerator.split(bucketEntry.getValue()).stream()
-                            .map(files -> new Split(partition, bucket, files, 
false))
-                            .forEach(splits::add);
-                }
-            }
-        }
-        return splits;
-    }
-
-    protected abstract SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory);
-
-    protected abstract void withNonPartitionFilter(Predicate predicate);
-
-    /** Scanning plan containing snapshot ID and input splits. */
-    public static class Plan {
-
-        @Nullable public final Long snapshotId;
-        public final List<Split> splits;
-
-        @VisibleForTesting
-        public Plan(@Nullable Long snapshotId, List<Split> splits) {
-            this.snapshotId = snapshotId;
-            this.splits = splits;
-        }
+    /** Plan of scan. */
+    interface Plan {
+        List<Split> splits();
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index 49fc4618..a00bed89 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -84,11 +84,11 @@ public class TableStreamingReader {
     @Nullable
     public Iterator<RowData> nextBatch() throws IOException {
         if (enumerator == null) {
-            TableScan scan = table.newScan();
+            DataTableScan scan = table.newScan();
             if (predicate != null) {
                 scan.withFilter(predicate);
             }
-            TableScan.Plan plan = scan.plan();
+            DataTableScan.DataFilePlan plan = scan.plan();
             if (plan.snapshotId == null) {
                 return null;
             }
@@ -109,14 +109,14 @@ public class TableStreamingReader {
         }
     }
 
-    private Iterator<RowData> read(TableScan.Plan plan) throws IOException {
+    private Iterator<RowData> read(DataTableScan.DataFilePlan plan) throws 
IOException {
         TableRead read = table.newRead().withProjection(projection);
         if (predicate != null) {
             read.withFilter(predicate);
         }
 
         List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new 
ArrayList<>();
-        for (Split split : plan.splits) {
+        for (DataSplit split : plan.splits) {
             readers.add(() -> read.createReader(split));
         }
         Iterator<RowData> iterator = new 
RecordReaderIterator<>(ConcatRecordReader.create(readers));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index e55d225a..0e471d9f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -44,7 +44,7 @@ import 
org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -297,7 +297,7 @@ public class TestFileStore extends KeyValueFileStore {
                 RecordReaderIterator<KeyValue> iterator =
                         new RecordReaderIterator<>(
                                 read.createReader(
-                                        new Split(
+                                        new DataSplit(
                                                 entryWithPartition.getKey(),
                                                 entryWithBucket.getKey(),
                                                 entryWithBucket.getValue(),
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 3f32e168..1dcc2abe 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -208,7 +208,7 @@ public class KeyValueFileStoreReadTest {
                 filesGroupedByPartition.entrySet()) {
             RecordReader<KeyValue> reader =
                     read.createReader(
-                            new Split(
+                            new DataSplit(
                                     entry.getKey(),
                                     0,
                                     entry.getValue().stream()
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 072eec88..79296b4a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 
@@ -58,7 +59,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
@@ -82,7 +83,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
                 .hasSameElementsAs(Arrays.asList("100|10", "101|11", "102|12", 
"101|11", "102|12"));
@@ -97,7 +98,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
 
         Predicate predicate = builder.equal(2, 201L);
-        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
@@ -114,7 +115,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -128,7 +129,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -144,7 +145,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         Predicate predicate = builder.equal(2, 101L);
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -197,7 +198,11 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         Predicate partitionFilter =
                 new PredicateBuilder(table.schema().logicalRowType()).equal(0, 
partition);
         List<Split> splits =
-                
table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits;
+                ((DataTableScan) table.newScan())
+                        .withFilter(partitionFilter)
+                        .withBucket(bucket)
+                        .plan()
+                        .splits();
         TableRead read = table.newRead();
 
         assertThat(getResult(read, splits, binaryRow(partition), bucket, 
STREAMING_ROW_TO_STRING))
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 21aaa6ac..3b4806b8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -51,7 +51,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .isEqualTo(
@@ -72,7 +72,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("101|11", "101|11", "102|12"));
@@ -87,7 +87,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
 
         Predicate predicate = builder.equal(2, 201L);
-        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
@@ -103,7 +103,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -121,7 +121,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -137,7 +137,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
 
         Predicate predicate = builder.equal(2, 201L);
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -188,7 +188,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         write.close();
 
         // check that no data file is produced
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         assertThat(splits).isEmpty();
         // check that no changelog file is produced
         Path bucketPath = DataFilePathFactory.bucketPath(table.location(), 
"1", 0);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 5cb9659c..b0d96233 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -62,7 +63,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.commit("1", write.prepareCommit(true));
         write.close();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", 
"1|11|101|binary|varbinary"));
@@ -73,7 +74,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 
.isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
@@ -87,7 +88,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Collections.singletonList("1000|10"));
@@ -102,7 +103,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
 
         Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
-        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
@@ -118,7 +119,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 
.isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
@@ -135,7 +136,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         writeData();
         FileStoreTable table = createFileStoreTable();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead().withProjection(PROJECTION);
 
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -152,7 +153,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
         List<Split> splits =
-                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -183,7 +184,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
-        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
@@ -251,7 +252,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         SnapshotEnumerator enumerator =
                 new SnapshotEnumerator(
                         tablePath,
-                        table.newScan().withIncremental(true),
+                        (DataTableScan) table.newScan().withIncremental(true),
                         ChangelogProducer.INPUT,
                         Snapshot.FIRST_SNAPSHOT_ID - 1);
 
@@ -259,7 +260,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
             SnapshotEnumerator.EnumeratorResult result = enumerator.call();
             assertThat(result).isNotNull();
 
-            List<Split> splits = result.plan.splits;
+            List<Split> splits = result.plan.splits();
             TableRead read = table.newRead();
             for (int j = 0; j < 2; j++) {
                 assertThat(getResult(read, splits, binaryRow(j + 1), 0, 
CHANGELOG_ROW_TO_STRING))
@@ -317,7 +318,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.commit("2", write.prepareCommit(true));
 
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
 
         // push down key filter a = 30
         TableRead read = table.newRead().withFilter(builder.equal(1, 30));
@@ -352,7 +353,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.close();
 
         // cannot push down value filter b = 600L
-        splits = table.newScan().plan().splits;
+        splits = table.newScan().plan().splits();
         read = table.newRead().withFilter(builder.equal(2, 600L));
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
@@ -384,7 +385,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 
.isEqualTo(Collections.singletonList("1|10|200|binary|varbinary"));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 152b0c27..94ed642c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.table.store.file.utils.TraceableFileSystem;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -147,7 +148,7 @@ public abstract class FileStoreTableTestBase {
         commit.withOverwritePartition(overwritePartition).commit("1", 
write.prepareCommit(true));
         write.close();
 
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 
.hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
@@ -177,9 +178,9 @@ public abstract class FileStoreTableTestBase {
                 table.newScan()
                         .withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5))
                         .plan()
-                        .splits;
+                        .splits();
         assertThat(splits.size()).isEqualTo(1);
-        assertThat(splits.get(0).bucket()).isEqualTo(1);
+        assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
     }
 
     @Test
@@ -204,7 +205,7 @@ public abstract class FileStoreTableTestBase {
         write.close();
 
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
@@ -278,7 +279,8 @@ public abstract class FileStoreTableTestBase {
         write.close();
 
         List<DataFileMeta> files =
-                table.newScan().plan().splits.stream()
+                table.newScan().plan().splits().stream()
+                        .map(split -> (DataSplit) split)
                         .flatMap(split -> split.files().stream())
                         .collect(Collectors.toList());
         for (DataFileMeta file : files) {
@@ -319,7 +321,8 @@ public abstract class FileStoreTableTestBase {
     private List<Split> getSplitsFor(List<Split> splits, BinaryRowData 
partition, int bucket) {
         List<Split> result = new ArrayList<>();
         for (Split split : splits) {
-            if (split.partition().equals(partition) && split.bucket() == 
bucket) {
+            DataSplit dataSplit = (DataSplit) split;
+            if (dataSplit.partition().equals(partition) && dataSplit.bucket() 
== bucket) {
                 result.add(split);
             }
         }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index d52df819..025ecb93 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -161,7 +161,7 @@ public class SchemaEvolutionTest {
         if (filter != null) {
             scan.withFilter(filter);
         }
-        for (Split split : scan.plan().splits) {
+        for (Split split : scan.plan().splits()) {
             TableRead read = table.newRead();
             if (filter != null) {
                 read.withFilter(filter);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 353ac24e..b6a09c4a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -77,7 +77,7 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
         write.close();
 
         // read
-        List<Split> splits = table.newScan().plan().splits;
+        List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         List<String> results = new ArrayList<>();
         for (int i = 0; i < 5; i++) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index 0ac385e4..d2cbb40f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link Split}. */
+/** Test for {@link DataSplit}. */
 public class SplitTest {
 
     @Test
@@ -44,12 +44,12 @@ public class SplitTest {
         for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
             files.add(gen.next().meta);
         }
-        Split split = new Split(data.partition, data.bucket, files, false);
+        DataSplit split = new DataSplit(data.partition, data.bucket, files, 
false);
 
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         split.serialize(new DataOutputViewStreamWrapper(out));
 
-        Split newSplit = Split.deserialize(new 
DataInputDeserializer(out.toByteArray()));
+        DataSplit newSplit = DataSplit.deserialize(new 
DataInputDeserializer(out.toByteArray()));
         assertThat(newSplit).isEqualTo(split);
     }
 }
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index f35b29e6..34cd506c 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
 
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -52,7 +52,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
         FileStoreTable table = createFileStoreTable(jobConf);
-        TableScan scan = table.newScan();
+        DataTableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
         return scan.plan().splits.stream()
                 .map(split -> new 
TableStoreInputSplit(table.location().toString(), split))
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index eeb3fc1e..ad5f36c2 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.mapred;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -40,17 +40,17 @@ public class TableStoreInputSplit extends FileSplit {
     private static final String[] ANYWHERE = new String[] {"*"};
 
     private String path;
-    private Split split;
+    private DataSplit split;
 
     // public no-argument constructor for deserialization
     public TableStoreInputSplit() {}
 
-    public TableStoreInputSplit(String path, Split split) {
+    public TableStoreInputSplit(String path, DataSplit split) {
         this.path = path;
         this.split = split;
     }
 
-    public Split split() {
+    public DataSplit split() {
         return split;
     }
 
@@ -89,7 +89,7 @@ public class TableStoreInputSplit extends FileSplit {
         int length = dataInput.readInt();
         byte[] bytes = new byte[length];
         dataInput.readFully(bytes);
-        split = Split.deserialize(new DataInputDeserializer(bytes));
+        split = DataSplit.deserialize(new DataInputDeserializer(bytes));
     }
 
     @Override
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index 962fb63f..ef3c3042 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.mapred;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.io.DataFileTestDataGenerator;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -54,7 +54,7 @@ public class TableStoreInputSplitTest {
         TableStoreInputSplit split =
                 new TableStoreInputSplit(
                         tempDir.toString(),
-                        new Split(
+                        new DataSplit(
                                 wantedPartition,
                                 0,
                                 generated.stream()
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index e8106dc3..08186888 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -186,7 +186,7 @@ public class TableStoreRecordReaderTest {
     private TableStoreRecordReader read(
             FileStoreTable table, BinaryRowData partition, int bucket, 
List<String> selectedColumns)
             throws Exception {
-        for (Split split : table.newScan().plan().splits) {
+        for (DataSplit split : table.newScan().plan().splits) {
             if (split.partition().equals(partition) && split.bucket() == 
bucket) {
                 return new TableStoreRecordReader(
                         table.newRead(),
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
index 33e7fa51..4f992945 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -18,22 +18,16 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.store.table.source.Split;
 
 import org.apache.spark.sql.connector.read.InputPartition;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
 /** A Spark {@link InputPartition} for table store. */
 public class SparkInputPartition implements InputPartition {
 
     private static final long serialVersionUID = 1L;
 
-    private transient Split split;
+    private final Split split;
 
     public SparkInputPartition(Split split) {
         this.split = split;
@@ -42,14 +36,4 @@ public class SparkInputPartition implements InputPartition {
     public Split split() {
         return split;
     }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-        split.serialize(new DataOutputViewStreamWrapper(out));
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-        split = Split.deserialize(new DataInputViewStreamWrapper(in));
-    }
 }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index c94db2eb..54b119ea 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -21,7 +21,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.RowType;
@@ -42,19 +42,18 @@ public class SparkReaderFactory implements 
PartitionReaderFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreTable table;
+    private final Table table;
     private final int[] projectedFields;
     private final List<Predicate> predicates;
 
-    public SparkReaderFactory(
-            FileStoreTable table, int[] projectedFields, List<Predicate> 
predicates) {
+    public SparkReaderFactory(Table table, int[] projectedFields, 
List<Predicate> predicates) {
         this.table = table;
         this.projectedFields = projectedFields;
         this.predicates = predicates;
     }
 
     private RowType readRowType() {
-        return TypeUtils.project(table.schema().logicalRowType(), 
projectedFields);
+        return TypeUtils.project(table.rowType(), projectedFields);
     }
 
     @Override
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index 88b63127..576373d9 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.utils.TypeUtils;
 
@@ -43,13 +42,13 @@ import java.util.OptionalLong;
  */
 public class SparkScan implements Scan, SupportsReportStatistics {
 
-    protected final FileStoreTable table;
+    protected final Table table;
     private final List<Predicate> predicates;
     private final int[] projectedFields;
 
     private List<Split> splits;
 
-    public SparkScan(FileStoreTable table, List<Predicate> predicates, int[] 
projectedFields) {
+    public SparkScan(Table table, List<Predicate> predicates, int[] 
projectedFields) {
         this.table = table;
         this.predicates = predicates;
         this.projectedFields = projectedFields;
@@ -58,13 +57,12 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
     @Override
     public String description() {
         // TODO add filters
-        return String.format("tablestore(%s)", table.location().getName());
+        return String.format("tablestore(%s)", table.name());
     }
 
     @Override
     public StructType readSchema() {
-        return SparkTypeUtils.fromFlinkRowType(
-                TypeUtils.project(table.schema().logicalRowType(), 
projectedFields));
+        return 
SparkTypeUtils.fromFlinkRowType(TypeUtils.project(table.rowType(), 
projectedFields));
     }
 
     @Override
@@ -86,7 +84,7 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
 
     protected List<Split> splits() {
         if (splits == null) {
-            this.splits = table.newScan().withFilter(predicates).plan().splits;
+            this.splits = 
table.newScan().withFilter(predicates).plan().splits();
         }
         return splits;
     }
@@ -96,9 +94,7 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
         long rowCount = 0L;
 
         for (Split split : splits()) {
-            for (DataFileMeta file : split.files()) {
-                rowCount += file.rowCount();
-            }
+            rowCount += split.rowCount();
         }
 
         final long numRows = rowCount;
@@ -128,13 +124,13 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
         }
 
         SparkScan that = (SparkScan) o;
-        return table.location().equals(that.table.location())
+        return table.name().equals(that.table.name())
                 && readSchema().equals(that.readSchema())
                 && predicates.equals(that.predicates);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(table.location(), readSchema(), predicates);
+        return Objects.hash(table.name(), readSchema(), predicates);
     }
 }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
index be639c1d..ae7faaea 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -35,19 +35,19 @@ import java.util.List;
 public class SparkScanBuilder
         implements ScanBuilder, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns {
 
-    private final FileStoreTable table;
+    private final Table table;
 
     private List<Predicate> predicates = new ArrayList<>();
     private Filter[] pushedFilters;
     private int[] projectedFields;
 
-    public SparkScanBuilder(FileStoreTable table) {
+    public SparkScanBuilder(Table table) {
         this.table = table;
     }
 
     @Override
     public Filter[] pushFilters(Filter[] filters) {
-        SparkFilterConverter converter = new 
SparkFilterConverter(table.schema().logicalRowType());
+        SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
         List<Predicate> predicates = new ArrayList<>();
         List<Filter> pushed = new ArrayList<>();
         for (Filter filter : filters) {
@@ -70,7 +70,7 @@ public class SparkScanBuilder
     @Override
     public void pruneColumns(StructType requiredSchema) {
         String[] pruneFields = requiredSchema.fieldNames();
-        List<String> fieldNames = table.schema().fieldNames();
+        List<String> fieldNames = table.rowType().getFieldNames();
         int[] projected = new int[pruneFields.length];
         for (int i = 0; i < projected.length; i++) {
             projected[i] = fieldNames.indexOf(pruneFields[i]);
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 90468627..87887a2b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.connector.catalog.Table;
@@ -62,7 +61,6 @@ public class SparkSource implements DataSourceRegister, 
TableProvider {
     @Override
     public Table getTable(
             StructType schema, Transform[] partitioning, Map<String, String> 
options) {
-        FileStoreTable table = 
FileStoreTableFactory.create(Configuration.fromMap(options));
-        return new SparkTable(table);
+        return new 
SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
     }
 }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 8cf39668..aaebaeff 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.SupportsPartition;
+import org.apache.flink.table.store.table.Table;
 
-import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.expressions.FieldReference;
@@ -33,12 +33,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.HashSet;
 import java.util.Set;
 
-/** A spark {@link Table} for table store. */
+/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for table 
store. */
 public class SparkTable implements 
org.apache.spark.sql.connector.catalog.Table, SupportsRead {
 
-    private final FileStoreTable table;
+    private final Table table;
 
-    public SparkTable(FileStoreTable table) {
+    public SparkTable(Table table) {
         this.table = table;
     }
 
@@ -50,12 +50,12 @@ public class SparkTable implements 
org.apache.spark.sql.connector.catalog.Table,
 
     @Override
     public String name() {
-        return table.location().getName();
+        return table.name();
     }
 
     @Override
     public StructType schema() {
-        return 
SparkTypeUtils.fromFlinkRowType(table.schema().logicalRowType());
+        return SparkTypeUtils.fromFlinkRowType(table.rowType());
     }
 
     @Override
@@ -67,9 +67,13 @@ public class SparkTable implements 
org.apache.spark.sql.connector.catalog.Table,
 
     @Override
     public Transform[] partitioning() {
-        return table.schema().partitionKeys().stream()
-                .map(FieldReference::apply)
-                .map(IdentityTransform::apply)
-                .toArray(Transform[]::new);
+        if (table instanceof SupportsPartition) {
+            return ((SupportsPartition) table)
+                    .partitionKeys().stream()
+                            .map(FieldReference::apply)
+                            .map(IdentityTransform::apply)
+                            .toArray(Transform[]::new);
+        }
+        return new Transform[0];
     }
 }
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
index 2a056add..96555ee7 100644
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
+++ 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.RowType;
@@ -46,20 +45,20 @@ public class SparkDataSourceReader
                 SupportsPushDownRequiredColumns,
                 SupportsReportStatistics {
 
-    private final FileStoreTable table;
+    private final Table table;
 
     private List<Predicate> predicates = new ArrayList<>();
     private Filter[] pushedFilters;
     private int[] projectedFields;
     private List<Split> splits;
 
-    public SparkDataSourceReader(FileStoreTable table) {
+    public SparkDataSourceReader(Table table) {
         this.table = table;
     }
 
     @Override
     public Filter[] pushFilters(Filter[] filters) {
-        SparkFilterConverter converter = new 
SparkFilterConverter(table.schema().logicalRowType());
+        SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
         List<Predicate> predicates = new ArrayList<>();
         List<Filter> pushed = new ArrayList<>();
         for (Filter filter : filters) {
@@ -82,7 +81,7 @@ public class SparkDataSourceReader
     @Override
     public void pruneColumns(StructType requiredSchema) {
         String[] pruneFields = requiredSchema.fieldNames();
-        List<String> fieldNames = table.schema().fieldNames();
+        List<String> fieldNames = table.rowType().getFieldNames();
         int[] projected = new int[pruneFields.length];
         for (int i = 0; i < projected.length; i++) {
             projected[i] = fieldNames.indexOf(pruneFields[i]);
@@ -95,9 +94,7 @@ public class SparkDataSourceReader
         long rowCount = 0L;
 
         for (Split split : splits()) {
-            for (DataFileMeta file : split.files()) {
-                rowCount += file.rowCount();
-            }
+            rowCount += split.rowCount();
         }
 
         final long numRows = rowCount;
@@ -118,7 +115,7 @@ public class SparkDataSourceReader
 
     @Override
     public StructType readSchema() {
-        RowType rowType = table.schema().logicalRowType();
+        RowType rowType = table.rowType();
         return SparkTypeUtils.fromFlinkRowType(
                 projectedFields == null ? rowType : TypeUtils.project(rowType, 
projectedFields));
     }
@@ -132,7 +129,7 @@ public class SparkDataSourceReader
 
     protected List<Split> splits() {
         if (splits == null) {
-            this.splits = table.newScan().withFilter(predicates).plan().splits;
+            this.splits = 
table.newScan().withFilter(predicates).plan().splits();
         }
         return splits;
     }
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
index f4c5432a..368e6e82 100644
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
+++ 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.TypeUtils;
@@ -35,8 +33,6 @@ import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.UncheckedIOException;
 import java.util.List;
 
@@ -47,14 +43,13 @@ public class SparkInputPartition implements 
InputPartition<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreTable table;
+    private final Table table;
     private final int[] projectedFields;
     private final List<Predicate> predicates;
-
-    private transient Split split;
+    private final Split split;
 
     public SparkInputPartition(
-            FileStoreTable table, int[] projectedFields, List<Predicate> 
predicates, Split split) {
+            Table table, int[] projectedFields, List<Predicate> predicates, 
Split split) {
         this.table = table;
         this.projectedFields = projectedFields;
         this.predicates = predicates;
@@ -111,17 +106,7 @@ public class SparkInputPartition implements 
InputPartition<InternalRow> {
     }
 
     private RowType readRowType() {
-        RowType rowType = table.schema().logicalRowType();
+        RowType rowType = table.rowType();
         return projectedFields == null ? rowType : TypeUtils.project(rowType, 
projectedFields);
     }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-        split.serialize(new DataOutputViewStreamWrapper(out));
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-        split = Split.deserialize(new DataInputViewStreamWrapper(in));
-    }
 }
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index bd2fde5b..0f8dd8f4 100644
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.sources.DataSourceRegister;
@@ -44,7 +43,7 @@ public class SparkSource implements DataSourceRegister, 
ReadSupport {
 
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
-        FileStoreTable table = 
FileStoreTableFactory.create(Configuration.fromMap(options.asMap()));
-        return new SparkDataSourceReader(table);
+        return new SparkDataSourceReader(
+                
FileStoreTableFactory.create(Configuration.fromMap(options.asMap())));
     }
 }

Reply via email to