This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f8d9bf [FLINK-28264] Refactor split and read in table store
08f8d9bf is described below

commit 08f8d9bf664fbfa7a1f0239ccce6163d88361c0a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 28 13:32:38 2022 +0800

    [FLINK-28264] Refactor split and read in table store
    
    This closes #177
---
 .../source/ContinuousFileSplitEnumerator.java      |   2 +-
 .../store/connector/source/FileStoreSource.java    |   2 +-
 .../connector/source/FileStoreSourceSplit.java     |  52 +++-------
 .../source/FileStoreSourceSplitGenerator.java      |   5 +-
 .../source/FileStoreSourceSplitReader.java         |   4 +-
 .../source/FileStoreSourceSplitSerializer.java     |  22 +---
 .../table/store/connector/sink/StoreSinkTest.java  |   5 +-
 .../store/connector/sink/TestFileStoreTable.java   |   9 +-
 .../source/FileStoreSourceReaderTest.java          |   3 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  20 ++--
 .../source/FileStoreSourceSplitReaderTest.java     |  18 ++--
 .../source/FileStoreSourceSplitSerializerTest.java |  45 ++++++++-
 .../source/FileStoreSourceSplitStateTest.java      |   3 +-
 .../PendingSplitsCheckpointSerializerTest.java     |   7 +-
 .../source/StaticFileStoreSplitEnumeratorTest.java |   3 +-
 .../source/TestChangelogDataReadWrite.java         |   8 +-
 .../apache/flink/table/store/file/KeyValue.java    |   5 +
 .../table/store/file/data/DataFileReader.java      |  11 +-
 .../file/operation/AbstractFileStoreScan.java      |   5 +-
 .../file/operation/AppendOnlyFileStoreRead.java    |   9 +-
 .../table/store/file/operation/FileStoreRead.java  |   9 +-
 .../file/operation/KeyValueFileStoreRead.java      |  68 +++++--------
 .../store/file/utils/ProjectKeyRecordReader.java   |  80 +++++++++++++++
 .../table/store/table/AbstractFileStoreTable.java  |  11 +-
 .../store/table/AppendOnlyFileStoreTable.java      |  20 ++--
 .../table/ChangelogValueCountFileStoreTable.java   |  22 +---
 .../table/ChangelogWithKeyFileStoreTable.java      |  11 +-
 .../flink/table/store/table/FileStoreTable.java    |   3 +-
 .../table/store/table/FileStoreTableFactory.java   |   8 +-
 .../store/table/source/KeyValueTableRead.java      |   8 +-
 .../flink/table/store/table/source/Split.java      |  35 ++-----
 .../flink/table/store/table/source/TableRead.java  |   8 +-
 .../flink/table/store/table/source/TableScan.java  |  20 ++--
 .../source/ValueCountRowDataRecordIterator.java    |  13 +--
 .../flink/table/store/file/TestFileStore.java      |   9 +-
 .../file/operation/KeyValueFileStoreReadTest.java  |  14 +--
 .../store/table/AppendOnlyFileStoreTableTest.java  |   9 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   9 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   9 +-
 .../table/store/table/FileStoreTableTestBase.java  |  17 ++--
 .../table/store/table/WritePreemptMemoryTest.java  |   3 +-
 .../flink/table/store/table/source/SplitTest.java  |   3 +-
 .../ValueCountRowDataRecordIteratorTest.java       |   9 +-
 .../table/store/mapred/TableStoreInputFormat.java  |   6 +-
 .../table/store/mapred/TableStoreInputSplit.java   | 111 +++++++--------------
 .../store/mapred/TableStoreInputSplitTest.java     |  17 ++--
 .../store/mapred/TableStoreRecordReaderTest.java   |   3 +-
 .../table/store/spark/SparkReaderFactory.java      |   4 +-
 .../apache/flink/table/store/spark/SparkScan.java  |   2 +-
 .../apache/flink/table/store/spark/SparkTable.java |   2 +-
 50 files changed, 380 insertions(+), 401 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 40d7fba4..8a8762c7 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -95,7 +95,7 @@ public class ContinuousFileSplitEnumerator
     }
 
     private void addSplit(FileStoreSourceSplit split) {
-        bucketSplits.computeIfAbsent(split.bucket(), i -> new 
LinkedList<>()).add(split);
+        bucketSplits.computeIfAbsent(split.split().bucket(), i -> new 
LinkedList<>()).add(split);
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 956d46d5..2752b64a 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -80,7 +80,7 @@ public class FileStoreSource
 
     @Override
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
-        TableRead read = table.newRead().withIncremental(isContinuous);
+        TableRead read = table.newRead();
         if (projectedFields != null) {
             read.withProjection(projectedFields);
         }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
index 114b967d..4ff027ea 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
@@ -19,10 +19,8 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.table.source.Split;
 
-import java.util.List;
 import java.util.Objects;
 
 /** {@link SourceSplit} of file store. */
@@ -31,42 +29,22 @@ public class FileStoreSourceSplit implements SourceSplit {
     /** The unique ID of the split. Unique within the scope of this source. */
     private final String id;
 
-    private final BinaryRowData partition;
-
-    private final int bucket;
-
-    private final List<DataFileMeta> files;
+    private final Split split;
 
     private final long recordsToSkip;
 
-    public FileStoreSourceSplit(
-            String id, BinaryRowData partition, int bucket, List<DataFileMeta> 
files) {
-        this(id, partition, bucket, files, 0);
+    public FileStoreSourceSplit(String id, Split split) {
+        this(id, split, 0);
     }
 
-    public FileStoreSourceSplit(
-            String id,
-            BinaryRowData partition,
-            int bucket,
-            List<DataFileMeta> files,
-            long recordsToSkip) {
+    public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
         this.id = id;
-        this.partition = partition;
-        this.bucket = bucket;
-        this.files = files;
+        this.split = split;
         this.recordsToSkip = recordsToSkip;
     }
 
-    public BinaryRowData partition() {
-        return partition;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    public List<DataFileMeta> files() {
-        return files;
+    public Split split() {
+        return split;
     }
 
     public long recordsToSkip() {
@@ -79,7 +57,7 @@ public class FileStoreSourceSplit implements SourceSplit {
     }
 
     public FileStoreSourceSplit updateWithRecordsToSkip(long recordsToSkip) {
-        return new FileStoreSourceSplit(id, partition, bucket, files, 
recordsToSkip);
+        return new FileStoreSourceSplit(id, split, recordsToSkip);
     }
 
     @Override
@@ -90,16 +68,14 @@ public class FileStoreSourceSplit implements SourceSplit {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        FileStoreSourceSplit split = (FileStoreSourceSplit) o;
-        return bucket == split.bucket
-                && recordsToSkip == split.recordsToSkip
-                && Objects.equals(id, split.id)
-                && Objects.equals(partition, split.partition)
-                && Objects.equals(files, split.files);
+        FileStoreSourceSplit other = (FileStoreSourceSplit) o;
+        return Objects.equals(id, other.id)
+                && Objects.equals(this.split, other.split)
+                && recordsToSkip == other.recordsToSkip;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, partition, bucket, files, recordsToSkip);
+        return Objects.hash(id, split, recordsToSkip);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index ef5efbe4..dc6b9cd3 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -37,10 +37,7 @@ public class FileStoreSourceSplitGenerator {
 
     public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
         return plan.splits.stream()
-                .map(
-                        s ->
-                                new FileStoreSourceSplit(
-                                        getNextId(), s.partition(), 
s.bucket(), s.files()))
+                .map(s -> new FileStoreSourceSplit(getNextId(), s))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 126ce871..3795259b 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -123,9 +123,7 @@ public class FileStoreSourceSplitReader
         }
 
         currentSplitId = nextSplit.splitId();
-        currentReader =
-                tableRead.createReader(
-                        nextSplit.partition(), nextSplit.bucket(), 
nextSplit.files());
+        currentReader = tableRead.createReader(nextSplit.split());
         currentNumRead = nextSplit.recordsToSkip();
         if (currentNumRead > 0) {
             seek(currentNumRead);
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 5f606b96..796373b6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -21,24 +21,15 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
+import org.apache.flink.table.store.table.source.Split;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import static 
org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
-import static 
org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
-
 /** A {@link SimpleVersionedSerializer} for {@link FileStoreSourceSplit}. */
 public class FileStoreSourceSplitSerializer
         implements SimpleVersionedSerializer<FileStoreSourceSplit> {
 
-    private final DataFileMetaSerializer dataFileSerializer;
-
-    public FileStoreSourceSplitSerializer() {
-        this.dataFileSerializer = new DataFileMetaSerializer();
-    }
-
     @Override
     public int getVersion() {
         return 1;
@@ -49,9 +40,7 @@ public class FileStoreSourceSplitSerializer
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
         view.writeUTF(split.splitId());
-        serializeBinaryRow(split.partition(), view);
-        view.writeInt(split.bucket());
-        dataFileSerializer.serializeList(split.files(), view);
+        split.split().serialize(view);
         view.writeLong(split.recordsToSkip());
         return out.toByteArray();
     }
@@ -59,11 +48,6 @@ public class FileStoreSourceSplitSerializer
     @Override
     public FileStoreSourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
-        return new FileStoreSourceSplit(
-                view.readUTF(),
-                deserializeBinaryRow(view),
-                view.readInt(),
-                dataFileSerializer.deserializeList(view),
-                view.readLong());
+        return new FileStoreSourceSplit(view.readUTF(), 
Split.deserialize(view), view.readLong());
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index d572de26..561dbce0 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -89,8 +89,9 @@ public class StoreSinkTest {
 
     @Before
     public void before() throws Exception {
+        Path path = new Path(tempFolder.newFolder().toURI().toString());
         TableSchema tableSchema =
-                new SchemaManager(new 
Path(tempFolder.newFolder().toURI().toString()))
+                new SchemaManager(path)
                         .commitNewVersion(
                                 new UpdateSchema(
                                         RowType.of(
@@ -107,7 +108,7 @@ public class StoreSinkTest {
 
         RowType partitionType = tableSchema.logicalPartitionType();
         fileStore = new TestFileStore(hasPk, partitionType);
-        table = new TestFileStoreTable(fileStore, tableSchema);
+        table = new TestFileStoreTable(path, fileStore, tableSchema);
     }
 
     @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 2bd7e41a..52b7febd 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -37,17 +38,19 @@ import org.apache.flink.types.RowKind;
 /** {@link FileStoreTable} for tests. */
 public class TestFileStoreTable implements FileStoreTable {
 
+    private final Path path;
     private final TestFileStore store;
     private final TableSchema tableSchema;
 
-    public TestFileStoreTable(TestFileStore store, TableSchema tableSchema) {
+    public TestFileStoreTable(Path path, TestFileStore store, TableSchema 
tableSchema) {
+        this.path = path;
         this.store = store;
         this.tableSchema = tableSchema;
     }
 
     @Override
-    public String name() {
-        return "test";
+    public Path location() {
+        return path;
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 979444f8..626bdc38 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Collections;
 
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -63,6 +64,6 @@ public class FileStoreSourceReaderTest {
     }
 
     private static FileStoreSourceSplit createTestFileSplit() {
-        return new FileStoreSourceSplit("id1", row(1), 0, 
Collections.emptyList());
+        return newSourceSplit("id1", row(1), 0, Collections.emptyList());
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 1b20a9df..d25e9119 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.table.store.connector.source;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableScan;
 
@@ -79,17 +77,16 @@ public class FileStoreSourceSplitGeneratorTest {
                 };
         List<Split> scanSplits =
                 TableScan.generateSplits(
-                        new FileStorePathFactory(new 
Path(tempDir.toUri().toString())),
-                        Collections::singletonList,
-                        plan.groupByPartFiles());
+                        false, Collections::singletonList, 
plan.groupByPartFiles());
         TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
 
         List<FileStoreSourceSplit> splits =
                 new 
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
         assertThat(splits.size()).isEqualTo(12);
         splits.sort(
-                Comparator.comparingInt(o -> ((FileStoreSourceSplit) 
o).partition().getInt(0))
-                        .thenComparing(o -> ((FileStoreSourceSplit) 
o).bucket()));
+                Comparator.comparingInt(
+                                o -> ((FileStoreSourceSplit) 
o).split().partition().getInt(0))
+                        .thenComparing(o -> ((FileStoreSourceSplit) 
o).split().bucket()));
         assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0", 
"f1"));
         assertSplit(splits.get(1), "0000000008", 1, 1, 
Collections.singletonList("f2"));
         assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", 
"f4", "f5"));
@@ -107,9 +104,12 @@ public class FileStoreSourceSplitGeneratorTest {
     private void assertSplit(
             FileStoreSourceSplit split, String splitId, int part, int bucket, 
List<String> files) {
         assertThat(split.splitId()).isEqualTo(splitId);
-        assertThat(split.partition().getInt(0)).isEqualTo(part);
-        assertThat(split.bucket()).isEqualTo(bucket);
-        
assertThat(split.files().stream().map(DataFileMeta::fileName).collect(Collectors.toList()))
+        assertThat(split.split().partition().getInt(0)).isEqualTo(part);
+        assertThat(split.split().bucket()).isEqualTo(bucket);
+        assertThat(
+                        split.split().files().stream()
+                                .map(DataFileMeta::fileName)
+                                .collect(Collectors.toList()))
                 .isEqualTo(files);
     }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index b5d39f2e..258f8da6 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -95,7 +96,7 @@ public class FileStoreSourceSplitReaderTest {
         List<Tuple2<Long, Long>> input = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
 
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 
skip));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, skip));
 
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
 
@@ -131,8 +132,7 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testPrimaryKeyWithDelete() throws Exception {
         TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader =
-                new 
FileStoreSourceSplitReader(rw.createReadWithKey().withIncremental(true));
+        FileStoreSourceSplitReader reader = new 
FileStoreSourceSplitReader(rw.createReadWithKey());
 
         List<Tuple2<Long, Long>> input = kvs();
         RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
@@ -150,7 +150,7 @@ public class FileStoreSourceSplitReaderTest {
         List<DataFileMeta> files = writer.prepareCommit().newFiles();
         writer.close();
 
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true));
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
 
         List<Tuple2<RowKind, Long>> expected =
@@ -180,7 +180,7 @@ public class FileStoreSourceSplitReaderTest {
         List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
         files.addAll(files2);
 
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
 
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
         assertRecords(
@@ -212,7 +212,7 @@ public class FileStoreSourceSplitReaderTest {
         List<Tuple2<Long, Long>> input = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
 
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 
3));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 3));
 
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
         assertRecords(
@@ -242,7 +242,7 @@ public class FileStoreSourceSplitReaderTest {
         List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
         files.addAll(files2);
 
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 
7));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 7));
 
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
         assertRecords(
@@ -268,11 +268,11 @@ public class FileStoreSourceSplitReaderTest {
 
         List<Tuple2<Long, Long>> input1 = kvs();
         List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1);
-        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, 
files1));
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files1));
 
         List<Tuple2<Long, Long>> input2 = kvs();
         List<DataFileMeta> files2 = rw.writeFiles(row(2), 1, input2);
-        assignSplit(reader, new FileStoreSourceSplit("id2", row(2), 1, 
files2));
+        assignSplit(reader, newSourceSplit("id2", row(2), 1, files2));
 
         RecordsWithSplitIds<RecordAndPosition<RowData>> records = 
reader.fetch();
         assertRecords(
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index d01be01c..90c595e4 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -19,13 +19,16 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.table.source.Split;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -36,7 +39,7 @@ public class FileStoreSourceSplitSerializerTest {
     @Test
     public void serializeSplit() throws Exception {
         final FileStoreSourceSplit split =
-                new FileStoreSourceSplit("id", row(1), 2, 
Arrays.asList(newFile(0), newFile(1)));
+                newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)));
 
         final FileStoreSourceSplit deSerialized = 
serializeAndDeserialize(split);
 
@@ -46,8 +49,7 @@ public class FileStoreSourceSplitSerializerTest {
     @Test
     public void serializeSplitWithReaderPosition() throws Exception {
         final FileStoreSourceSplit split =
-                new FileStoreSourceSplit(
-                        "id", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)), 29);
+                newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)), 29);
 
         final FileStoreSourceSplit deSerialized = 
serializeAndDeserialize(split);
 
@@ -57,8 +59,7 @@ public class FileStoreSourceSplitSerializerTest {
     @Test
     public void repeatedSerialization() throws Exception {
         final FileStoreSourceSplit split =
-                new FileStoreSourceSplit(
-                        "id", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)), 29);
+                newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)), 29);
 
         serializeAndDeserialize(split);
         serializeAndDeserialize(split);
@@ -86,6 +87,40 @@ public class FileStoreSourceSplitSerializerTest {
                 level);
     }
 
+    public static FileStoreSourceSplit newSourceSplit(
+            String id, BinaryRowData partition, int bucket, List<DataFileMeta> 
files) {
+        return newSourceSplit(id, partition, bucket, files, false, 0);
+    }
+
+    public static FileStoreSourceSplit newSourceSplit(
+            String id,
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> files,
+            boolean isIncremental) {
+        return newSourceSplit(id, partition, bucket, files, isIncremental, 0);
+    }
+
+    public static FileStoreSourceSplit newSourceSplit(
+            String id,
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> files,
+            long recordsToSkip) {
+        return newSourceSplit(id, partition, bucket, files, false, 
recordsToSkip);
+    }
+
+    public static FileStoreSourceSplit newSourceSplit(
+            String id,
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> files,
+            boolean isIncremental,
+            long recordsToSkip) {
+        return new FileStoreSourceSplit(
+                id, new Split(partition, bucket, files, isIncremental), 
recordsToSkip);
+    }
+
     private static FileStoreSourceSplit 
serializeAndDeserialize(FileStoreSourceSplit split)
             throws IOException {
         final FileStoreSourceSplitSerializer serializer = new 
FileStoreSourceSplitSerializer();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
index fa268d4a..e6c72d63 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -66,7 +67,7 @@ public class FileStoreSourceSplitStateTest {
     }
 
     private static FileStoreSourceSplit getTestSplit(long recordsToSkip) {
-        return new FileStoreSourceSplit(
+        return newSourceSplit(
                 "id", row(1), 2, Arrays.asList(newFile(0), newFile(1)), 
recordsToSkip);
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 124d5a32..a3ab6a2f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -82,15 +83,15 @@ public class PendingSplitsCheckpointSerializerTest {
     // ------------------------------------------------------------------------
 
     private static FileStoreSourceSplit testSplit1() {
-        return new FileStoreSourceSplit("id1", row(1), 2, 
Arrays.asList(newFile(0), newFile(1)));
+        return newSourceSplit("id1", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)));
     }
 
     private static FileStoreSourceSplit testSplit2() {
-        return new FileStoreSourceSplit("id2", row(2), 3, 
Arrays.asList(newFile(2), newFile(3)));
+        return newSourceSplit("id2", row(2), 3, Arrays.asList(newFile(2), 
newFile(3)));
     }
 
     private static FileStoreSourceSplit testSplit3() {
-        return new FileStoreSourceSplit("id3", row(3), 4, 
Arrays.asList(newFile(5), newFile(6)));
+        return newSourceSplit("id3", row(3), 4, Arrays.asList(newFile(5), 
newFile(6)));
     }
 
     private static PendingSplitsCheckpoint serializeAndDeserialize(
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
index 7fe861d4..6a60b140 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -96,7 +97,7 @@ public class StaticFileStoreSplitEnumeratorTest {
     // ------------------------------------------------------------------------
 
     private static FileStoreSourceSplit createRandomSplit() {
-        return new FileStoreSourceSplit("split", row(1), 2, 
Arrays.asList(newFile(0), newFile(1)));
+        return newSourceSplit("split", row(1), 2, Arrays.asList(newFile(0), 
newFile(1)));
     }
 
     private static StaticFileStoreSplitEnumerator createEnumerator(
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 66c66dd0..feba9da2 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -95,7 +95,7 @@ public class TestChangelogDataReadWrite {
     }
 
     public TableRead createReadWithValueCount() {
-        return createRead(it -> new ValueCountRowDataRecordIterator(it, null));
+        return createRead(ValueCountRowDataRecordIterator::new);
     }
 
     private TableRead createRead(
@@ -117,12 +117,6 @@ public class TestChangelogDataReadWrite {
                 throw new UnsupportedOperationException();
             }
 
-            @Override
-            public TableRead withIncremental(boolean isIncremental) {
-                read.withDropDelete(!isIncremental);
-                return this;
-            }
-
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 1978220c..80083b6f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -59,6 +59,11 @@ public class KeyValue {
         return this;
     }
 
+    public KeyValue replaceKey(RowData key) {
+        this.key = key;
+        return this;
+    }
+
     public RowData key() {
         return key;
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index aecf25ae..098cf430 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -134,6 +134,7 @@ public class DataFileReader {
         private final FileFormat fileFormat;
         private final FileStorePathFactory pathFactory;
 
+        private final int[][] fullKeyProjection;
         private int[][] keyProjection;
         private int[][] valueProjection;
         private RowType projectedKeyType;
@@ -153,7 +154,8 @@ public class DataFileReader {
             this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
 
-            this.keyProjection = Projection.range(0, 
keyType.getFieldCount()).toNestedIndexes();
+            this.fullKeyProjection = Projection.range(0, 
keyType.getFieldCount()).toNestedIndexes();
+            this.keyProjection = fullKeyProjection;
             this.valueProjection = Projection.range(0, 
valueType.getFieldCount()).toNestedIndexes();
             applyProjection();
         }
@@ -171,6 +173,13 @@ public class DataFileReader {
         }
 
         public DataFileReader create(BinaryRowData partition, int bucket) {
+            return create(partition, bucket, true);
+        }
+
+        public DataFileReader create(BinaryRowData partition, int bucket, 
boolean projectKeys) {
+            int[][] keyProjection = projectKeys ? this.keyProjection : 
fullKeyProjection;
+            RowType projectedKeyType = projectKeys ? this.projectedKeyType : 
keyType;
+
             RowType recordType = KeyValue.schema(keyType, valueType);
             int[][] projection =
                     KeyValue.project(keyProjection, valueProjection, 
keyType.getFieldCount());
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 4745d41e..3fbb2653 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -40,9 +40,8 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -181,7 +180,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             throw new RuntimeException("Failed to read ManifestEntry list 
concurrently", e);
         }
 
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        LinkedHashMap<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
         for (ManifestEntry entry : entries) {
             ManifestEntry.Identifier identifier = entry.identifier();
             switch (entry.kind()) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index d8a77d63..346cac87 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -22,7 +22,6 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.AppendOnlyReader;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
@@ -31,6 +30,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -69,14 +69,13 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<RowData> {
     }
 
     @Override
-    public RecordReader<RowData> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) 
throws IOException {
+    public RecordReader<RowData> createReader(Split split) throws IOException {
         BulkFormat<RowData, FileSourceSplit> readerFactory =
                 fileFormat.createReaderFactory(rowType, projection);
         DataFilePathFactory dataFilePathFactory =
-                pathFactory.createDataFilePathFactory(partition, bucket);
+                pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
         List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new 
ArrayList<>();
-        for (DataFileMeta file : files) {
+        for (DataFileMeta file : split.files()) {
             suppliers.add(
                     () ->
                             new AppendOnlyReader(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index fbe02fea..43729878 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Read operation which provides {@link RecordReader} creation.
@@ -32,7 +30,6 @@ import java.util.List;
  */
 public interface FileStoreRead<T> {
 
-    /** Create a {@link RecordReader} from partition and bucket and files. */
-    RecordReader<T> createReader(BinaryRowData partition, int bucket, 
List<DataFileMeta> files)
-            throws IOException;
+    /** Create a {@link RecordReader} from split. */
+    RecordReader<T> createReader(Split split) throws IOException;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 12cc5ed9..f9eb3b2a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
@@ -30,7 +29,9 @@ import 
org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -48,8 +49,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     private final Comparator<RowData> keyComparator;
     private final MergeFunction mergeFunction;
 
-    private boolean keyProjected;
-    private boolean dropDelete = true;
+    private int[][] keyProjectedFields;
 
     public KeyValueFileStoreRead(
             SchemaManager schemaManager,
@@ -65,18 +65,11 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                         schemaManager, schemaId, keyType, valueType, 
fileFormat, pathFactory);
         this.keyComparator = keyComparator;
         this.mergeFunction = mergeFunction;
-
-        this.keyProjected = false;
-    }
-
-    public KeyValueFileStoreRead withDropDelete(boolean dropDelete) {
-        this.dropDelete = dropDelete;
-        return this;
     }
 
     public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
         dataFileReaderFactory.withKeyProjection(projectedFields);
-        keyProjected = true;
+        this.keyProjectedFields = projectedFields;
         return this;
     }
 
@@ -85,43 +78,34 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         return this;
     }
 
-    /**
-     * The resulting reader has the following characteristics:
-     *
-     * <ul>
-     *   <li>If {@link KeyValueFileStoreRead#withKeyProjection} is called, 
key-values produced by
-     *       this reader may be unordered and may contain duplicated keys.
-     *   <li>If {@link KeyValueFileStoreRead#withKeyProjection} is not called, 
key-values produced
-     *       by this reader is guaranteed to be ordered by keys and does not 
contain duplicated
-     *       keys.
-     * </ul>
-     */
     @Override
-    public RecordReader<KeyValue> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) 
throws IOException {
-        DataFileReader dataFileReader = 
dataFileReaderFactory.create(partition, bucket);
-        if (keyProjected) {
-            // key projection has been applied, so data file readers will not 
return key-values in
-            // order, we have to return the raw file contents without merging
+    public RecordReader<KeyValue> createReader(Split split) throws IOException 
{
+        if (split.isIncremental()) {
+            DataFileReader dataFileReader =
+                    dataFileReaderFactory.create(split.partition(), 
split.bucket(), true);
+            // Return the raw file contents without merging
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new 
ArrayList<>();
-            for (DataFileMeta file : files) {
+            for (DataFileMeta file : split.files()) {
                 suppliers.add(() -> dataFileReader.read(file.fileName()));
             }
-
-            if (dropDelete) {
-                throw new UnsupportedOperationException(
-                        "The key is projected, there is no ability to merge 
records, so the deleted message cannot be dropped.");
-            }
             return ConcatRecordReader.create(suppliers);
         } else {
-            // key projection is not applied, so data file readers will return 
key-values in order,
-            // in this case merge tree can merge records with same key for us
-            return new MergeTreeReader(
-                    new IntervalPartition(files, keyComparator).partition(),
-                    dropDelete,
-                    dataFileReader,
-                    keyComparator,
-                    mergeFunction.copy());
+            // in this case merge tree should merge records with same key
+            // Do not project key in MergeTreeReader.
+            DataFileReader dataFileReader =
+                    dataFileReaderFactory.create(split.partition(), 
split.bucket(), false);
+            MergeTreeReader reader =
+                    new MergeTreeReader(
+                            new IntervalPartition(split.files(), 
keyComparator).partition(),
+                            true,
+                            dataFileReader,
+                            keyComparator,
+                            mergeFunction.copy());
+
+            // project key using ProjectKeyRecordReader
+            return keyProjectedFields == null
+                    ? reader
+                    : new ProjectKeyRecordReader(reader, keyProjectedFields);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
new file mode 100644
index 00000000..95abb6ba
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** A {@link RecordReader} with key projection. */
+public class ProjectKeyRecordReader implements RecordReader<KeyValue> {
+
+    private final RecordReader<KeyValue> reader;
+    private final ProjectedRowData projectedRow;
+
+    public ProjectKeyRecordReader(RecordReader<KeyValue> reader, int[][] 
keyProjectedFields) {
+        this.reader = reader;
+        this.projectedRow = ProjectedRowData.from(keyProjectedFields);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<KeyValue> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = reader.readBatch();
+        if (batch == null) {
+            return null;
+        }
+
+        return new ProjectedIterator(batch, projectedRow);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    /** A {@link RecordIterator} with key projection. */
+    public static class ProjectedIterator implements RecordIterator<KeyValue> {
+
+        private final RecordIterator<KeyValue> iterator;
+        private final ProjectedRowData projectedRow;
+
+        public ProjectedIterator(RecordIterator<KeyValue> iterator, 
ProjectedRowData projectedRow) {
+            this.iterator = iterator;
+            this.projectedRow = projectedRow;
+        }
+
+        @Override
+        public KeyValue next() throws IOException {
+            KeyValue kv = iterator.next();
+            if (kv == null) {
+                return null;
+            }
+            return kv.replaceKey(projectedRow.replaceRow(kv.key()));
+        }
+
+        @Override
+        public void releaseBatch() {
+            iterator.releaseBatch();
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index d04f104a..0897416e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -29,19 +30,19 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private final String name;
+    private final Path path;
     protected final TableSchema tableSchema;
 
-    public AbstractFileStoreTable(String name, TableSchema tableSchema) {
-        this.name = name;
+    public AbstractFileStoreTable(Path path, TableSchema tableSchema) {
+        this.path = path;
         this.tableSchema = tableSchema;
     }
 
     protected abstract FileStore<?> store();
 
     @Override
-    public String name() {
-        return name;
+    public Path location() {
+        return path;
     }
 
     @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index b22fd6e7..4fdede94 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.AppendOnlyFileStore;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -37,6 +36,7 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
@@ -44,7 +44,6 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.List;
 
 /** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
 public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
@@ -54,8 +53,8 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     private final AppendOnlyFileStore store;
 
     AppendOnlyFileStoreTable(
-            String name, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
-        super(name, tableSchema);
+            Path path, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
+        super(path, tableSchema);
         this.store =
                 new AppendOnlyFileStore(
                         schemaManager,
@@ -94,15 +93,8 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
             }
 
             @Override
-            public TableRead withIncremental(boolean isIncremental) {
-                return this;
-            }
-
-            @Override
-            public RecordReader<RowData> createReader(
-                    BinaryRowData partition, int bucket, List<DataFileMeta> 
files)
-                    throws IOException {
-                return read.createReader(partition, bucket, files);
+            public RecordReader<RowData> createReader(Split split) throws 
IOException {
+                return read.createReader(split);
             }
         };
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 5319c7b5..d622f6ec 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
@@ -56,8 +57,8 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     private final KeyValueFileStore store;
 
     ChangelogValueCountFileStoreTable(
-            String name, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
-        super(name, tableSchema);
+            Path path, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
+        super(path, tableSchema);
         RowType countType =
                 RowType.of(
                         new LogicalType[] {new BigIntType(false)}, new 
String[] {"_VALUE_COUNT"});
@@ -96,30 +97,17 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public TableRead newRead() {
         return new KeyValueTableRead(store.newRead()) {
-            private int[][] projection = null;
-            private boolean isIncremental = false;
 
             @Override
             public TableRead withProjection(int[][] projection) {
-                if (isIncremental) {
-                    read.withKeyProjection(projection);
-                } else {
-                    this.projection = projection;
-                }
-                return this;
-            }
-
-            @Override
-            public TableRead withIncremental(boolean isIncremental) {
-                this.isIncremental = isIncremental;
-                read.withDropDelete(!isIncremental);
+                read.withKeyProjection(projection);
                 return this;
             }
 
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
-                return new ValueCountRowDataRecordIterator(kvRecordIterator, 
projection);
+                return new ValueCountRowDataRecordIterator(kvRecordIterator);
             }
         };
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index bb26efa0..1c32917f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
@@ -61,8 +62,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     private final KeyValueFileStore store;
 
     ChangelogWithKeyFileStoreTable(
-            String name, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
-        super(name, tableSchema);
+            Path path, SchemaManager schemaManager, TableSchema tableSchema, 
String user) {
+        super(path, tableSchema);
         RowType rowType = tableSchema.logicalRowType();
 
         // add _KEY_ prefix to avoid conflict with value
@@ -158,12 +159,6 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                 return this;
             }
 
-            @Override
-            public TableRead withIncremental(boolean isIncremental) {
-                read.withDropDelete(!isIncremental);
-                return this;
-            }
-
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 7b7d13cd..82074f32 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -34,7 +35,7 @@ import java.io.Serializable;
  */
 public interface FileStoreTable extends Serializable {
 
-    String name();
+    Path location();
 
     TableSchema schema();
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 418706c4..104ae0bb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -38,7 +38,6 @@ public class FileStoreTableFactory {
 
     public static FileStoreTable create(Configuration conf, String user) {
         Path tablePath = FileStoreOptions.path(conf);
-        String name = tablePath.getName();
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager
@@ -56,13 +55,14 @@ public class FileStoreTableFactory {
         tableSchema = tableSchema.copy(newOptions);
 
         if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
-            return new AppendOnlyFileStoreTable(name, schemaManager, 
tableSchema, user);
+            return new AppendOnlyFileStoreTable(tablePath, schemaManager, 
tableSchema, user);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
                 return new ChangelogValueCountFileStoreTable(
-                        name, schemaManager, tableSchema, user);
+                        tablePath, schemaManager, tableSchema, user);
             } else {
-                return new ChangelogWithKeyFileStoreTable(name, schemaManager, 
tableSchema, user);
+                return new ChangelogWithKeyFileStoreTable(
+                        tablePath, schemaManager, tableSchema, user);
             }
         }
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index f5336065..c3939a4d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -19,16 +19,13 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading 
of {@link RowData}.
@@ -42,9 +39,8 @@ public abstract class KeyValueTableRead implements TableRead {
     }
 
     @Override
-    public RecordReader<RowData> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) 
throws IOException {
-        return new RowDataRecordReader(read.createReader(partition, bucket, 
files));
+    public RecordReader<RowData> createReader(Split split) throws IOException {
+        return new RowDataRecordReader(read.createReader(split));
     }
 
     protected abstract RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
index 29f5be37..d8dad370 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -31,26 +30,20 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.flink.table.store.file.utils.SerializationUtils.deserializedBytes;
-import static 
org.apache.flink.table.store.file.utils.SerializationUtils.serializeBytes;
-import static org.apache.flink.util.InstantiationUtil.deserializeObject;
-import static org.apache.flink.util.InstantiationUtil.serializeObject;
-
 /** Input splits. Needed by most batch computation engines. */
 public class Split {
 
     private final BinaryRowData partition;
     private final int bucket;
     private final List<DataFileMeta> files;
+    private final boolean isIncremental;
 
-    private final Path bucketPath;
-
-    public Split(BinaryRowData partition, int bucket, List<DataFileMeta> 
files, Path bucketPath) {
+    public Split(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files, 
boolean isIncremental) {
         this.partition = partition;
         this.bucket = bucket;
         this.files = files;
-
-        this.bucketPath = bucketPath;
+        this.isIncremental = isIncremental;
     }
 
     public BinaryRowData partition() {
@@ -65,8 +58,8 @@ public class Split {
         return files;
     }
 
-    public Path bucketPath() {
-        return bucketPath;
+    public boolean isIncremental() {
+        return isIncremental;
     }
 
     @Override
@@ -81,12 +74,12 @@ public class Split {
         return bucket == split.bucket
                 && Objects.equals(partition, split.partition)
                 && Objects.equals(files, split.files)
-                && Objects.equals(bucketPath, split.bucketPath);
+                && isIncremental == split.isIncremental;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(partition, bucket, files, bucketPath);
+        return Objects.hash(partition, bucket, files, isIncremental);
     }
 
     public void serialize(DataOutputView out) throws IOException {
@@ -97,7 +90,7 @@ public class Split {
         for (DataFileMeta file : files) {
             dataFileSer.serialize(file, out);
         }
-        serializeBytes(out, serializeObject(bucketPath));
+        out.writeBoolean(isIncremental);
     }
 
     public static Split deserialize(DataInputView in) throws IOException {
@@ -109,14 +102,6 @@ public class Split {
         for (int i = 0; i < fileNumber; i++) {
             files.add(dataFileSer.deserialize(in));
         }
-        Path bucketPath;
-        try {
-            bucketPath =
-                    deserializeObject(
-                            deserializedBytes(in), 
Thread.currentThread().getContextClassLoader());
-        } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
-        return new Split(partition, bucket, files, bucketPath);
+        return new Split(partition, bucket, files, in.readBoolean());
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 013c1b49..b8fa4061 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -19,14 +19,11 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 
 /** An abstraction layer above {@link FileStoreRead} to provide reading of 
{@link RowData}. */
 public interface TableRead {
@@ -41,8 +38,5 @@ public interface TableRead {
 
     TableRead withProjection(int[][] projection);
 
-    TableRead withIncremental(boolean isIncremental);
-
-    RecordReader<RowData> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) 
throws IOException;
+    RecordReader<RowData> createReader(Split split) throws IOException;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 2ca147d9..f69ae473 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
@@ -44,6 +43,8 @@ public abstract class TableScan {
     private final TableSchema tableSchema;
     private final FileStorePathFactory pathFactory;
 
+    private boolean isIncremental = false;
+
     protected TableScan(
             FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory 
pathFactory) {
         this.scan = scan;
@@ -91,6 +92,7 @@ public abstract class TableScan {
     }
 
     public TableScan withIncremental(boolean isIncremental) {
+        this.isIncremental = isIncremental;
         scan.withIncremental(isIncremental);
         return this;
     }
@@ -102,12 +104,12 @@ public abstract class TableScan {
 
     private List<Split> generateSplits(
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
-        return generateSplits(pathFactory, splitGenerator(pathFactory), 
groupedDataFiles);
+        return generateSplits(isIncremental, splitGenerator(pathFactory), 
groupedDataFiles);
     }
 
     @VisibleForTesting
     public static List<Split> generateSplits(
-            FileStorePathFactory pathFactory,
+            boolean isIncremental,
             SplitGenerator splitGenerator,
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
         List<Split> splits = new ArrayList<>();
@@ -117,10 +119,14 @@ public abstract class TableScan {
             Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
             for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
                 int bucket = bucketEntry.getKey();
-                Path bucketPath = pathFactory.bucketPath(partition, bucket);
-                splitGenerator.split(bucketEntry.getValue()).stream()
-                        .map(files -> new Split(partition, bucket, files, 
bucketPath))
-                        .forEach(splits::add);
+                if (isIncremental) {
+                    // Don't split when incremental
+                    splits.add(new Split(partition, bucket, 
bucketEntry.getValue(), true));
+                } else {
+                    splitGenerator.split(bucketEntry.getValue()).stream()
+                            .map(files -> new Split(partition, bucket, files, 
false))
+                            .forEach(splits::add);
+                }
             }
         }
         return splits;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
index 1d37cc0d..246a683a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
@@ -19,13 +19,10 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.types.RowKind;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /**
@@ -35,16 +32,11 @@ import java.io.IOException;
  */
 public class ValueCountRowDataRecordIterator extends 
ResetRowKindRecordIterator {
 
-    private final @Nullable ProjectedRowData projectedRowData;
-
     private RowData rowData;
     private long count;
 
-    public ValueCountRowDataRecordIterator(
-            RecordReader.RecordIterator<KeyValue> kvIterator, @Nullable 
int[][] projection) {
+    public 
ValueCountRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> 
kvIterator) {
         super(kvIterator);
-        this.projectedRowData = projection == null ? null : 
ProjectedRowData.from(projection);
-
         this.rowData = null;
         this.count = 0;
     }
@@ -61,8 +53,7 @@ public class ValueCountRowDataRecordIterator extends 
ResetRowKindRecordIterator
                     return null;
                 }
 
-                rowData =
-                        projectedRowData == null ? kv.key() : 
projectedRowData.replaceRow(kv.key());
+                rowData = kv.key();
                 long value = kv.value().getLong(0);
                 if (value < 0) {
                     rowData.setRowKind(RowKind.DELETE);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index bf3fce36..fb290140 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -292,9 +293,11 @@ public class TestFileStore extends KeyValueFileStore {
                 RecordReaderIterator<KeyValue> iterator =
                         new RecordReaderIterator<>(
                                 read.createReader(
-                                        entryWithPartition.getKey(),
-                                        entryWithBucket.getKey(),
-                                        entryWithBucket.getValue()));
+                                        new Split(
+                                                entryWithPartition.getKey(),
+                                                entryWithBucket.getKey(),
+                                                entryWithBucket.getValue(),
+                                                false)));
                 while (iterator.hasNext()) {
                     kvs.add(iterator.next().copy(keySerializer, 
valueSerializer));
                 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 30866470..a2086ba8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -192,7 +193,6 @@ public class KeyValueFileStoreReadTest {
         KeyValueFileStoreRead read = store.newRead();
         if (keyProjection != null) {
             read.withKeyProjection(keyProjection);
-            read.withDropDelete(false);
         }
         if (valueProjection != null) {
             read.withValueProjection(valueProjection);
@@ -203,11 +203,13 @@ public class KeyValueFileStoreReadTest {
                 filesGroupedByPartition.entrySet()) {
             RecordReader<KeyValue> reader =
                     read.createReader(
-                            entry.getKey(),
-                            0,
-                            entry.getValue().stream()
-                                    .map(ManifestEntry::file)
-                                    .collect(Collectors.toList()));
+                            new Split(
+                                    entry.getKey(),
+                                    0,
+                                    entry.getValue().stream()
+                                            .map(ManifestEntry::file)
+                                            .collect(Collectors.toList()),
+                                    false));
             RecordReaderIterator<KeyValue> actualIterator = new 
RecordReaderIterator<>(reader);
             while (actualIterator.hasNext()) {
                 result.add(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 92fc82ac..85579d09 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -98,7 +98,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -111,7 +111,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+        TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+101|11", "+102|12"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -127,7 +127,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         Predicate predicate = builder.equal(2, 101L);
         List<Split> splits =
                 
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
@@ -175,7 +175,6 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 Collections.emptyList(),
                                 conf.toMap(),
                                 ""));
-        return new AppendOnlyFileStoreTable(
-                tablePath.getName(), schemaManager, tableSchema, "user");
+        return new AppendOnlyFileStoreTable(tablePath, schemaManager, 
tableSchema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 6f2cccc1..fcc1a3f8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -97,7 +97,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -110,7 +110,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+        TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("-100|10", "+101|11"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -126,7 +126,7 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
         Predicate predicate = builder.equal(2, 201L);
         List<Split> splits =
                 
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -178,7 +178,6 @@ public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBas
                                 Collections.emptyList(),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogValueCountFileStoreTable(
-                tablePath.getName(), schemaManager, tableSchema, "user");
+        return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, 
tableSchema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 8cdad460..6b9e7a49 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -97,7 +97,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(Collections.singletonList("-1|11|1001"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
@@ -110,7 +110,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
-        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+        TableRead read = table.newRead().withProjection(PROJECTION);
 
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
                 .isEqualTo(Collections.singletonList("-1001|11"));
@@ -127,7 +127,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), 
builder.equal(1, 21));
         List<Split> splits =
                 
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
-        TableRead read = table.newRead().withIncremental(true);
+        TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
                 .isEqualTo(
@@ -177,7 +177,6 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(
-                tablePath.getName(), schemaManager, tableSchema, "user");
+        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, 
tableSchema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 0dd8171b..d36f6423 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -23,7 +23,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import 
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.sink.TableWrite;
@@ -103,8 +104,11 @@ public abstract class FileStoreTableTestBase {
             int bucket,
             Function<RowData, String> rowDataToString)
             throws Exception {
-        RecordReader<RowData> recordReader =
-                read.createReader(partition, bucket, getFilesFor(splits, 
partition, bucket));
+        List<ReaderSupplier<RowData>> readers = new ArrayList<>();
+        for (Split split : getSplitsFor(splits, partition, bucket)) {
+            readers.add(() -> read.createReader(split));
+        }
+        RecordReader<RowData> recordReader = 
ConcatRecordReader.create(readers);
         RecordReaderIterator<RowData> iterator = new 
RecordReaderIterator<>(recordReader);
         List<String> result = new ArrayList<>();
         while (iterator.hasNext()) {
@@ -115,12 +119,11 @@ public abstract class FileStoreTableTestBase {
         return result;
     }
 
-    private List<DataFileMeta> getFilesFor(
-            List<Split> splits, BinaryRowData partition, int bucket) {
-        List<DataFileMeta> result = new ArrayList<>();
+    private List<Split> getSplitsFor(List<Split> splits, BinaryRowData 
partition, int bucket) {
+        List<Split> result = new ArrayList<>();
         for (Split split : splits) {
             if (split.partition().equals(partition) && split.bucket() == 
bucket) {
-                result.addAll(split.files());
+                result.add(split);
             }
         }
         return result;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 0e81baec..4c6d02fe 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -103,7 +103,6 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(
-                tablePath.getName(), schemaManager, schema, "user");
+        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, 
schema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index 9eb754e6..7540b7b5 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -45,7 +44,7 @@ public class SplitTest {
         for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
             files.add(gen.next().meta);
         }
-        Split split = new Split(data.partition, data.bucket, files, new 
Path("/tmp/test"));
+        Split split = new Split(data.partition, data.bucket, files, false);
 
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         split.serialize(new DataOutputViewStreamWrapper(out));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
index 0f6dd6f5..72a29253 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.store.table.source;
 
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
 import org.apache.flink.table.store.file.utils.ReusingTestData;
 import org.apache.flink.types.RowKind;
 
@@ -48,7 +50,7 @@ public class ValueCountRowDataRecordIteratorTest extends 
RowDataRecordIteratorTe
 
         testIterator(
                 input,
-                kvIterator -> new ValueCountRowDataRecordIterator(kvIterator, 
null),
+                ValueCountRowDataRecordIterator::new,
                 (rowData, idx) -> {
                     assertThat(rowData.getArity()).isEqualTo(1);
                     
assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
@@ -75,7 +77,10 @@ public class ValueCountRowDataRecordIteratorTest extends 
RowDataRecordIteratorTe
                 input,
                 kvIterator ->
                         new ValueCountRowDataRecordIterator(
-                                kvIterator, new int[][] {new int[] {0}, new 
int[] {0}}),
+                                new ProjectKeyRecordReader.ProjectedIterator(
+                                        kvIterator,
+                                        ProjectedRowData.from(
+                                                new int[][] {new int[] {0}, 
new int[] {0}}))),
                 (rowData, idx) -> {
                     assertThat(rowData.getArity()).isEqualTo(2);
                     
assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
diff --git 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 726ca878..98f502ac 100644
--- 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -55,7 +55,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
         TableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
         return scan.plan().splits.stream()
-                .map(TableStoreInputSplit::create)
+                .map(split -> 
TableStoreInputSplit.create(table.location().toString(), split))
                 .toArray(TableStoreInputSplit[]::new);
     }
 
@@ -65,9 +65,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
         FileStoreTable table = createFileStoreTable(jobConf);
         TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
         long splitLength = split.getLength();
-        return new TableStoreRecordReader(
-                table.newRead().createReader(split.partition(), 
split.bucket(), split.files()),
-                splitLength);
+        return new 
TableStoreRecordReader(table.newRead().createReader(split.split()), 
splitLength);
     }
 
     private FileStoreTable createFileStoreTable(JobConf jobConf) {
diff --git 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 4bbef384..2c041305 100644
--- 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -18,24 +18,17 @@
 
 package org.apache.flink.table.store.mapred;
 
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.SerializationUtils;
 import org.apache.flink.table.store.table.source.Split;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -46,42 +39,28 @@ public class TableStoreInputSplit extends FileSplit {
 
     private static final String[] ANYWHERE = new String[] {"*"};
 
-    private BinaryRowData partition;
-    private int bucket;
-    private List<DataFileMeta> files;
-    private String bucketPath;
+    private String path;
+    private Split split;
 
     // public no-argument constructor for deserialization
     public TableStoreInputSplit() {}
 
-    public TableStoreInputSplit(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files, 
String bucketPath) {
-        this.partition = partition;
-        this.bucket = bucket;
-        this.files = files;
-        this.bucketPath = bucketPath;
+    public TableStoreInputSplit(String path, Split split) {
+        this.path = path;
+        this.split = split;
     }
 
-    public static TableStoreInputSplit create(Split split) {
-        return new TableStoreInputSplit(
-                split.partition(), split.bucket(), split.files(), 
split.bucketPath().toString());
+    public static TableStoreInputSplit create(String path, Split split) {
+        return new TableStoreInputSplit(path, split);
     }
 
-    public BinaryRowData partition() {
-        return partition;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    public List<DataFileMeta> files() {
-        return files;
+    public Split split() {
+        return split;
     }
 
     @Override
     public Path getPath() {
-        return new Path(bucketPath);
+        return new Path(path);
     }
 
     @Override
@@ -91,7 +70,7 @@ public class TableStoreInputSplit extends FileSplit {
 
     @Override
     public long getLength() {
-        return files.stream().mapToLong(DataFileMeta::fileSize).sum();
+        return split.files().stream().mapToLong(DataFileMeta::fileSize).sum();
     }
 
     @Override
@@ -101,63 +80,41 @@ public class TableStoreInputSplit extends FileSplit {
 
     @Override
     public void write(DataOutput dataOutput) throws IOException {
-        dataOutput.writeInt(bucket);
-
-        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-            DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(bos);
-            SerializationUtils.serializeBinaryRow(partition, view);
-            DataFileMetaSerializer metaSerializer = new 
DataFileMetaSerializer();
-            metaSerializer.serializeList(files, view);
-            byte[] bytes = bos.toByteArray();
-            dataOutput.writeInt(bytes.length);
-            dataOutput.write(bytes);
-        }
-
-        byte[] bucketPathBytes = bucketPath.getBytes();
-        dataOutput.writeInt(bucketPathBytes.length);
-        dataOutput.write(bucketPathBytes);
+        dataOutput.writeUTF(path);
+        DataOutputSerializer out = new DataOutputSerializer(128);
+        split.serialize(out);
+        dataOutput.writeInt(out.length());
+        dataOutput.write(out.getCopyOfBuffer());
     }
 
     @Override
     public void readFields(DataInput dataInput) throws IOException {
-        bucket = dataInput.readInt();
-
-        byte[] fieldBytes = new byte[dataInput.readInt()];
-        dataInput.readFully(fieldBytes);
-        try (ByteArrayInputStream bis = new ByteArrayInputStream(fieldBytes)) {
-            DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(bis);
-            partition = SerializationUtils.deserializeBinaryRow(view);
-            DataFileMetaSerializer metaSerializer = new 
DataFileMetaSerializer();
-            files = metaSerializer.deserializeList(view);
-        }
-
-        byte[] bucketPathBytes = new byte[dataInput.readInt()];
-        dataInput.readFully(bucketPathBytes);
-        bucketPath = new String(bucketPathBytes);
+        path = dataInput.readUTF();
+        int length = dataInput.readInt();
+        byte[] bytes = new byte[length];
+        dataInput.readFully(bytes);
+        split = Split.deserialize(new DataInputDeserializer(bytes));
     }
 
     @Override
     public String toString() {
-        return "{"
-                + String.join(
-                        ", ",
-                        Arrays.asList(
-                                "partition: " + partition.toString(),
-                                "bucket: " + bucket,
-                                "files: " + files.toString(),
-                                "bucketPath: " + bucketPath))
-                + "}";
+        return "{" + "path='" + path + '\'' + ", split=" + split + '}';
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof TableStoreInputSplit)) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
         TableStoreInputSplit that = (TableStoreInputSplit) o;
-        return Objects.equals(partition, that.partition)
-                && bucket == that.bucket
-                && Objects.equals(files, that.files)
-                && Objects.equals(bucketPath, that.bucketPath);
+        return Objects.equals(path, that.path) && Objects.equals(split, 
that.split);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(path, split);
     }
 }
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index 90e25fcf..4321abd7 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.mapred;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
+import org.apache.flink.table.store.table.source.Split;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -52,13 +53,15 @@ public class TableStoreInputSplitTest {
         BinaryRowData wantedPartition = generated.get(0).partition;
         TableStoreInputSplit split =
                 new TableStoreInputSplit(
-                        wantedPartition,
-                        0,
-                        generated.stream()
-                                .filter(d -> 
d.partition.equals(wantedPartition))
-                                .map(d -> d.meta)
-                                .collect(Collectors.toList()),
-                        tempDir.toString());
+                        tempDir.toString(),
+                        new Split(
+                                wantedPartition,
+                                0,
+                                generated.stream()
+                                        .filter(d -> 
d.partition.equals(wantedPartition))
+                                        .map(d -> d.meta)
+                                        .collect(Collectors.toList()),
+                                false));
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(baos);
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 064a239e..6befdfff 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -142,7 +142,8 @@ public class TableStoreRecordReaderTest {
         for (Split split : table.newScan().plan().splits) {
             if (split.partition().equals(partition) && split.bucket() == 
bucket) {
                 return Tuple2.of(
-                        table.newRead().createReader(partition, bucket, 
split.files()),
+                        table.newRead()
+                                .createReader(new Split(partition, bucket, 
split.files(), false)),
                         
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
             }
         }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index 2e792aff..f247f908 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -21,7 +21,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -52,13 +51,12 @@ public class SparkReaderFactory implements 
PartitionReaderFactory {
 
     @Override
     public PartitionReader<InternalRow> createReader(InputPartition partition) 
{
-        Split split = ((SparkInputPartition) partition).split();
         RecordReader<RowData> reader;
         try {
             reader =
                     table.newRead()
                             .withProjection(projectedFields)
-                            .createReader(split.partition(), split.bucket(), 
split.files());
+                            .createReader(((SparkInputPartition) 
partition).split());
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index f7425953..564ca215 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -57,7 +57,7 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
     @Override
     public String description() {
         // TODO add filters
-        return String.format("tablestore(%s)", table.name());
+        return String.format("tablestore(%s)", table.location().getName());
     }
 
     @Override
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 0d1648ae..c5ec1a18 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -47,7 +47,7 @@ public class SparkTable implements 
org.apache.spark.sql.connector.catalog.Table,
 
     @Override
     public String name() {
-        return table.name();
+        return table.location().getName();
     }
 
     @Override

Reply via email to