This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b9c186e [FLINK-27973] Introduce TableWrite and TableCommit as an 
abstraction layer above FileStore for writing RowData
2b9c186e is described below

commit 2b9c186e771db18af08bac2a6a2f8cd6f1a084ae
Author: tsreaper <[email protected]>
AuthorDate: Fri Jun 10 10:56:38 2022 +0800

    [FLINK-27973] Introduce TableWrite and TableCommit as an abstraction layer 
above FileStore for writing RowData
    
    This closes #150
---
 .../connector/sink/BucketStreamPartitioner.java    |   2 +-
 .../connector/sink/CommittableSerializer.java      |   1 +
 .../connector/sink/FileCommittableSerializer.java  |   1 +
 .../store/connector/sink/StoreGlobalCommitter.java |   1 +
 .../table/store/connector/sink/StoreSink.java      |   4 +-
 .../store/connector/sink/StoreSinkWriter.java      |   5 +-
 .../connector/sink/CommittableSerializerTest.java  |   1 +
 .../sink/FileCommittableSerializerTest.java        |   1 +
 .../flink/table/store/file/FileStoreImpl.java      |   4 +
 .../flink/table/store/file/schema/Schema.java      |   9 +-
 .../flink/table/store/log/LogSinkProvider.java     |   2 +-
 .../table/store/table/AbstractFileStoreTable.java  |   9 +
 .../store/table/AppendOnlyFileStoreTable.java      |  42 ++++-
 .../table/ChangelogValueCountFileStoreTable.java   |  55 ++++--
 .../table/ChangelogWithKeyFileStoreTable.java      |  78 +++++++--
 .../flink/table/store/table/FileStoreTable.java    |  14 +-
 .../table/store/table}/sink/FileCommittable.java   |   2 +-
 .../table/store/{ => table}/sink/SinkRecord.java   |   2 +-
 .../{ => table}/sink/SinkRecordConverter.java      |  12 +-
 .../flink/table/store/table/sink/TableCommit.java  |  81 +++++++++
 .../flink/table/store/table/sink/TableWrite.java   | 133 ++++++++++++++
 .../flink/table/store/table/source/TableRead.java  |   2 +
 .../flink/table/store/table/source/TableScan.java  |  23 ++-
 .../store/table/AppendOnlyFileStoreTableTest.java  | 187 ++++++++++++++++++++
 .../ChangelogValueCountFileStoreTableTest.java     | 184 +++++++++++++++++++
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 194 +++++++++++++++++++++
 .../table/store/table/FileStoreTableTestBase.java  | 138 +++++++++++++++
 .../table/store/mapred/TableStoreInputFormat.java  |   4 +-
 .../flink/table/store/FileStoreTestHelper.java     | 141 ---------------
 .../flink/table/store/FileStoreTestUtils.java      |  53 ++++++
 .../hive/TableStoreHiveStorageHandlerITCase.java   | 191 ++++++++------------
 .../store/mapred/TableStoreRecordReaderTest.java   | 111 +++++-------
 .../store/kafka/KafkaLogSerializationSchema.java   |   2 +-
 .../table/store/kafka/KafkaLogSinkProvider.java    |   2 +-
 .../store/kafka/KafkaLogSerializationTest.java     |   2 +-
 .../flink/table/store/kafka/KafkaLogTestUtils.java |   2 +-
 .../table/store/kafka/TestOffsetsLogSink.java      |   2 +-
 37 files changed, 1309 insertions(+), 388 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index dedf5d88..c663b4c3 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 /** A {@link StreamPartitioner} to partition records by bucket. */
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 95938625..9f6dc1d1 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index ede31b28..3bd03110 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index 20e405e8..f02e9eb1 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 
 import javax.annotation.Nullable;
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index a7ada476..5a3d498c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.log.LogInitContext;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.sink.SinkRecord;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 
 import javax.annotation.Nullable;
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 5bdb37f5..9a914636 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -31,8 +31,9 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.sink.SinkRecord;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index c451c18a..98db8f5b 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.runtime.operators.sink.TestSink;
 import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
index 9bf8e996..73b66f03 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index b4d2a640..ea196f4e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -88,6 +88,10 @@ public class FileStoreImpl implements FileStore {
                 options.fileFormat().getFormatIdentifier());
     }
 
+    public FileStoreOptions options() {
+        return options;
+    }
+
     @VisibleForTesting
     public ManifestFile.Factory manifestFileFactory() {
         return new ManifestFile.Factory(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 6b13cf17..2afc641f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -143,8 +143,13 @@ public class Schema implements Serializable {
         return projectedLogicalRowType(partitionKeys);
     }
 
-    public RowType logicalPrimaryKeysType() {
-        return projectedLogicalRowType(primaryKeys);
+    public RowType logicalTrimmedPrimaryKeysType() {
+        return projectedLogicalRowType(trimmedPrimaryKeys());
+    }
+
+    public int[] projection(List<String> projectedFieldNames) {
+        List<String> fieldNames = fieldNames();
+        return 
projectedFieldNames.stream().mapToInt(fieldNames::indexOf).toArray();
     }
 
     private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
index 8a091fa2..effa38e8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.log;
 
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 
 import java.io.Serializable;
 import java.util.function.Consumer;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 39b1887d..6d52c0b3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.types.logical.RowType;
 
 /** Abstract {@link FileStoreTable}. */
@@ -43,4 +45,11 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public RowType rowType() {
         return schema.logicalRowType();
     }
+
+    @Override
+    public TableCommit newCommit() {
+        return new TableCommit(store().newCommit(), store().newExpire());
+    }
+
+    protected abstract FileStore store();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 1c0463a7..70e31059 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -19,31 +19,36 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
 
 /** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
 public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private final Schema schema;
     private final FileStoreImpl store;
 
     AppendOnlyFileStoreTable(String name, Schema schema, String user) {
         super(name, schema);
-        this.schema = schema;
         this.store =
                 new FileStoreImpl(
                         schema.id(),
@@ -57,9 +62,8 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan(boolean incremental) {
-        FileStoreScan scan = store.newScan().withIncremental(incremental);
-        return new TableScan(scan, schema, store.pathFactory()) {
+    public TableScan newScan() {
+        return new TableScan(store.newScan(), schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 scan.withValueFilter(predicate);
@@ -68,7 +72,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableRead newRead(boolean incremental) {
+    public TableRead newRead() {
         return new TableRead(store.newRead()) {
             @Override
             public TableRead withProjection(int[][] projection) {
@@ -76,6 +80,11 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                 return this;
             }
 
+            @Override
+            public TableRead withIncremental(boolean isIncremental) {
+                return this;
+            }
+
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -85,7 +94,24 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore fileStore() {
+    public TableWrite newWrite() {
+        SinkRecordConverter recordConverter =
+                new SinkRecordConverter(store.options().bucket(), schema);
+        return new TableWrite(store.newWrite(), recordConverter) {
+            @Override
+            protected void writeSinkRecord(SinkRecord record, RecordWriter 
writer)
+                    throws Exception {
+                Preconditions.checkState(
+                        record.row().getRowKind() == RowKind.INSERT,
+                        "Append only writer can not accept row with RowKind 
%s",
+                        record.row().getRowKind());
+                writer.write(ValueKind.ADD, BinaryRowDataUtil.EMPTY_ROW, 
record.row());
+            }
+        };
+    }
+
+    @Override
+    public FileStore store() {
         return store;
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 66d8b18b..071e8f56 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -18,19 +18,23 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -64,9 +68,8 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan(boolean incremental) {
-        FileStoreScan scan = store.newScan().withIncremental(incremental);
-        return new TableScan(scan, schema, store.pathFactory()) {
+    public TableScan newScan() {
+        return new TableScan(store.newScan(), schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 scan.withKeyFilter(predicate);
@@ -75,14 +78,14 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableRead newRead(boolean incremental) {
-        FileStoreRead read = store.newRead().withDropDelete(!incremental);
-        return new TableRead(read) {
+    public TableRead newRead() {
+        return new TableRead(store.newRead()) {
             private int[][] projection = null;
+            private boolean isIncremental = false;
 
             @Override
             public TableRead withProjection(int[][] projection) {
-                if (incremental) {
+                if (isIncremental) {
                     read.withKeyProjection(projection);
                 } else {
                     this.projection = projection;
@@ -90,6 +93,13 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                 return this;
             }
 
+            @Override
+            public TableRead withIncremental(boolean isIncremental) {
+                this.isIncremental = isIncremental;
+                read.withDropDelete(!isIncremental);
+                return this;
+            }
+
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -99,7 +109,32 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore fileStore() {
+    public TableWrite newWrite() {
+        SinkRecordConverter recordConverter =
+                new SinkRecordConverter(store.options().bucket(), schema);
+        return new TableWrite(store.newWrite(), recordConverter) {
+            @Override
+            protected void writeSinkRecord(SinkRecord record, RecordWriter 
writer)
+                    throws Exception {
+                switch (record.row().getRowKind()) {
+                    case INSERT:
+                    case UPDATE_AFTER:
+                        writer.write(ValueKind.ADD, record.row(), 
GenericRowData.of(1L));
+                        break;
+                    case UPDATE_BEFORE:
+                    case DELETE:
+                        writer.write(ValueKind.ADD, record.row(), 
GenericRowData.of(-1L));
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown row kind " + 
record.row().getRowKind());
+                }
+            }
+        };
+    }
+
+    @Override
+    public FileStore store() {
         return store;
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index af7ae532..fb56416b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -24,22 +24,28 @@ import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with 
primary keys. */
@@ -56,7 +62,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         // add _KEY_ prefix to avoid conflict with value
         RowType keyType =
                 new RowType(
-                        schema.logicalPrimaryKeysType().getFields().stream()
+                        
schema.logicalTrimmedPrimaryKeysType().getFields().stream()
                                 .map(
                                         f ->
                                                 new RowType.RowField(
@@ -98,26 +104,51 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public TableScan newScan(boolean incremental) {
-        FileStoreScan scan = store.newScan().withIncremental(incremental);
-        return new TableScan(scan, schema, store.pathFactory()) {
+    public TableScan newScan() {
+        return new TableScan(store.newScan(), schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
-                scan.withValueFilter(predicate);
+                // currently we can only perform filter push down on keys
+                // consider this case:
+                //   data file 1: insert key = a, value = 1
+                //   data file 2: update key = a, value = 2
+                //   filter: value = 1
+                // if we perform filter push down on values, data file 1 will 
be chosen, but data
+                // file 2 will be ignored, and the final result will be key = 
a, value = 1 while the
+                // correct result is an empty set
+                // TODO support value filter
+                List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys();
+                int[] fieldIdxToKeyIdx =
+                        schema.fields().stream()
+                                .mapToInt(f -> 
trimmedPrimaryKeys.indexOf(f.name()))
+                                .toArray();
+                List<Predicate> keyFilters = new ArrayList<>();
+                for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+                    Optional<Predicate> mapped = mapFilterFields(p, 
fieldIdxToKeyIdx);
+                    mapped.ifPresent(keyFilters::add);
+                }
+                if (keyFilters.size() > 0) {
+                    scan.withKeyFilter(PredicateBuilder.and(keyFilters));
+                }
             }
         };
     }
 
     @Override
-    public TableRead newRead(boolean incremental) {
-        FileStoreRead read = store.newRead().withDropDelete(!incremental);
-        return new TableRead(read) {
+    public TableRead newRead() {
+        return new TableRead(store.newRead()) {
             @Override
             public TableRead withProjection(int[][] projection) {
                 read.withValueProjection(projection);
                 return this;
             }
 
+            @Override
+            public TableRead withIncremental(boolean isIncremental) {
+                read.withDropDelete(!isIncremental);
+                return this;
+            }
+
             @Override
             protected RecordReader.RecordIterator<RowData> 
rowDataRecordIteratorFromKv(
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -127,7 +158,32 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore fileStore() {
+    public TableWrite newWrite() {
+        SinkRecordConverter recordConverter =
+                new SinkRecordConverter(store.options().bucket(), schema);
+        return new TableWrite(store.newWrite(), recordConverter) {
+            @Override
+            protected void writeSinkRecord(SinkRecord record, RecordWriter 
writer)
+                    throws Exception {
+                switch (record.row().getRowKind()) {
+                    case INSERT:
+                    case UPDATE_AFTER:
+                        writer.write(ValueKind.ADD, record.primaryKey(), 
record.row());
+                        break;
+                    case UPDATE_BEFORE:
+                    case DELETE:
+                        writer.write(ValueKind.DELETE, record.primaryKey(), 
record.row());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown row kind " + 
record.row().getRowKind());
+                }
+            }
+        };
+    }
+
+    @Override
+    public FileStore store() {
         return store;
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 0c9d148b..65645f76 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.types.logical.RowType;
@@ -36,11 +36,11 @@ public interface FileStoreTable extends Serializable {
 
     RowType rowType();
 
-    TableScan newScan(boolean incremental);
+    TableScan newScan();
 
-    TableRead newRead(boolean incremental);
+    TableRead newRead();
 
-    // TODO remove this once TableWrite is introduced
-    @VisibleForTesting
-    FileStore fileStore();
+    TableWrite newWrite();
+
+    TableCommit newCommit();
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
similarity index 97%
rename from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
index e088d1d3..e2a78286 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.connector.sink;
+package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.mergetree.Increment;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
similarity index 97%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
index 4fccccae..e43f74f9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.sink;
+package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
similarity index 89%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 112f7726..3a2aed45 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.sink;
+package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
@@ -43,6 +44,15 @@ public class SinkRecordConverter {
 
     @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
 
+    public SinkRecordConverter(int numBucket, Schema schema) {
+        this(
+                numBucket,
+                schema.logicalRowType(),
+                schema.projection(schema.partitionKeys()),
+                schema.projection(schema.trimmedPrimaryKeys()),
+                schema.projection(schema.primaryKeys()));
+    }
+
     public SinkRecordConverter(
             int numBucket,
             RowType inputType,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
new file mode 100644
index 00000000..b8f7da95
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstraction layer above {@link FileStoreCommit} and {@link 
FileStoreExpire} to provide
+ * snapshot commit and expiration.
+ */
+public class TableCommit {
+
+    private final FileStoreCommit commit;
+    private final FileStoreExpire expire;
+
+    @Nullable private Map<String, String> overwritePartition = null;
+
+    public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
+        this.commit = commit;
+        this.expire = expire;
+    }
+
+    public TableCommit withOverwritePartition(@Nullable Map<String, String> 
overwritePartition) {
+        this.overwritePartition = overwritePartition;
+        return this;
+    }
+
+    public List<ManifestCommittable> filterRecoveredCommittables(
+            List<ManifestCommittable> committables) {
+        return commit.filterCommitted(committables);
+    }
+
+    public void commit(String identifier, List<FileCommittable> 
fileCommittables) {
+        ManifestCommittable committable = new ManifestCommittable(identifier);
+        for (FileCommittable fileCommittable : fileCommittables) {
+            committable.addFileCommittable(
+                    fileCommittable.partition(),
+                    fileCommittable.bucket(),
+                    fileCommittable.increment());
+        }
+        commit(Collections.singletonList(committable));
+    }
+
+    public void commit(List<ManifestCommittable> committables) {
+        if (overwritePartition == null) {
+            for (ManifestCommittable committable : committables) {
+                commit.commit(committable, new HashMap<>());
+            }
+        } else {
+            for (ManifestCommittable committable : committables) {
+                commit.overwrite(overwritePartition, committable, new 
HashMap<>());
+            }
+        }
+        expire.expire();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
new file mode 100644
index 00000000..c2502be2
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** An abstraction layer above {@link FileStoreWrite} to provide {@link 
RowData} writing. */
+public abstract class TableWrite {
+
+    private final FileStoreWrite write;
+    private final SinkRecordConverter recordConverter;
+
+    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+    private final ExecutorService compactExecutor;
+
+    private boolean overwrite = false;
+
+    protected TableWrite(FileStoreWrite write, SinkRecordConverter 
recordConverter) {
+        this.write = write;
+        this.recordConverter = recordConverter;
+
+        this.writers = new HashMap<>();
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
+    }
+
+    public TableWrite withOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+        return this;
+    }
+
+    public void write(RowData rowData) throws Exception {
+        SinkRecord record = recordConverter.convert(rowData);
+        RecordWriter writer = getWriter(record.partition(), record.bucket());
+        writeSinkRecord(record, writer);
+    }
+
+    public List<FileCommittable> prepareCommit() throws Exception {
+        List<FileCommittable> result = new ArrayList<>();
+
+        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> 
partIter =
+                writers.entrySet().iterator();
+        while (partIter.hasNext()) {
+            Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = 
partIter.next();
+            BinaryRowData partition = partEntry.getKey();
+            Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+                    partEntry.getValue().entrySet().iterator();
+            while (bucketIter.hasNext()) {
+                Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+                int bucket = entry.getKey();
+                RecordWriter writer = entry.getValue();
+                FileCommittable committable =
+                        new FileCommittable(partition, bucket, 
writer.prepareCommit());
+                result.add(committable);
+
+                // clear if no update
+                // we need a mechanism to clear writers, otherwise there will 
be more and more
+                // such as yesterday's partition that no longer needs to be 
written.
+                if (committable.increment().newFiles().isEmpty()) {
+                    closeWriter(writer);
+                    bucketIter.remove();
+                }
+            }
+
+            if (partEntry.getValue().isEmpty()) {
+                partIter.remove();
+            }
+        }
+
+        return result;
+    }
+
+    private void closeWriter(RecordWriter writer) throws Exception {
+        writer.sync();
+        writer.close();
+    }
+
+    public void close() throws Exception {
+        compactExecutor.shutdownNow();
+        for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
+            for (RecordWriter writer : bucketWriters.values()) {
+                closeWriter(writer);
+            }
+        }
+        writers.clear();
+    }
+
+    protected abstract void writeSinkRecord(SinkRecord record, RecordWriter 
writer)
+            throws Exception;
+
+    private RecordWriter getWriter(BinaryRowData partition, int bucket) {
+        Map<Integer, RecordWriter> buckets = writers.get(partition);
+        if (buckets == null) {
+            buckets = new HashMap<>();
+            writers.put(partition.copy(), buckets);
+        }
+        return buckets.computeIfAbsent(
+                bucket,
+                k ->
+                        overwrite
+                                ? write.createEmptyWriter(partition.copy(), 
bucket, compactExecutor)
+                                : write.createWriter(partition.copy(), bucket, 
compactExecutor));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index be694bbc..53ff22e6 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -50,6 +50,8 @@ public abstract class TableRead {
 
     public abstract TableRead withProjection(int[][] projection);
 
+    public abstract TableRead withIncremental(boolean isIncremental);
+
     public RecordReader<RowData> createReader(
             BinaryRowData partition, int bucket, List<DataFileMeta> files) 
throws IOException {
         return new RowDataRecordReader(read.createReader(partition, bucket, 
files));
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 665a8053..7689902d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.And;
 import org.apache.flink.table.store.file.predicate.CompoundPredicate;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -64,7 +63,7 @@ public abstract class TableScan {
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
         for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-            Optional<Predicate> mapped = mapToPartitionFilter(p, 
fieldIdxToPartitionIdx);
+            Optional<Predicate> mapped = mapFilterFields(p, 
fieldIdxToPartitionIdx);
             if (mapped.isPresent()) {
                 partitionFilters.add(mapped.get());
             } else {
@@ -72,8 +71,17 @@ public abstract class TableScan {
             }
         }
 
-        scan.withPartitionFilter(new CompoundPredicate(And.INSTANCE, 
partitionFilters));
-        withNonPartitionFilter(new CompoundPredicate(And.INSTANCE, 
nonPartitionFilters));
+        if (partitionFilters.size() > 0) {
+            scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+        }
+        if (nonPartitionFilters.size() > 0) {
+            withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
+        }
+        return this;
+    }
+
+    public TableScan withIncremental(boolean isIncremental) {
+        scan.withIncremental(isIncremental);
         return this;
     }
 
@@ -86,13 +94,12 @@ public abstract class TableScan {
 
     protected abstract void withNonPartitionFilter(Predicate predicate);
 
-    private Optional<Predicate> mapToPartitionFilter(
-            Predicate predicate, int[] fieldIdxToPartitionIdx) {
+    protected Optional<Predicate> mapFilterFields(Predicate predicate, int[] 
fieldIdxMapping) {
         if (predicate instanceof CompoundPredicate) {
             CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
             List<Predicate> children = new ArrayList<>();
             for (Predicate child : compoundPredicate.children()) {
-                Optional<Predicate> mapped = mapToPartitionFilter(child, 
fieldIdxToPartitionIdx);
+                Optional<Predicate> mapped = mapFilterFields(child, 
fieldIdxMapping);
                 if (mapped.isPresent()) {
                     children.add(mapped.get());
                 } else {
@@ -102,7 +109,7 @@ public abstract class TableScan {
             return Optional.of(new 
CompoundPredicate(compoundPredicate.function(), children));
         } else {
             LeafPredicate leafPredicate = (LeafPredicate) predicate;
-            int mapped = fieldIdxToPartitionIdx[leafPredicate.index()];
+            int mapped = fieldIdxMapping[leafPredicate.index()];
             if (mapped >= 0) {
                 return Optional.of(
                         new LeafPredicate(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
new file mode 100644
index 00000000..b69ebb1f
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AppendOnlyFileStoreTable}. */
+// TODO enable this test class after append only file store with avro format 
is fixed
+@Disabled
+public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testBatchReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList("1|10|100", "1|11|101", "1|12|102", 
"1|11|101", "1|12|102"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202", 
"2|21|201"));
+    }
+
+    @Test
+    public void testBatchProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead().withProjection(PROJECTION);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .isEqualTo(Arrays.asList("100|10", "101|11", "102|12", 
"101|11", "102|12"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .isEqualTo(Arrays.asList("200|20", "201|21", "202|22", 
"201|21"));
+    }
+
+    @Test
+    public void testBatchFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.equal(
+                        2, 
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList(
+                                "2|21|201",
+                                // this record is in the same file with the 
first "2|21|201"
+                                "2|22|202",
+                                "2|21|201"));
+    }
+
+    @Test
+    public void testStreamingReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("+2|21|201"));
+    }
+
+    @Test
+    public void testStreamingProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+101|11", "+102|12"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("+201|21"));
+    }
+
+    @Test
+    public void testStreamingFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.equal(
+                        2, 
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 101L));
+        List<Split> splits =
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList(
+                                "+1|11|101",
+                                // this record is in the same file with 
"+1|11|101"
+                                "+1|12|102"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
+    }
+
+    private void writeData() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite();
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(GenericRowData.of(1, 11, 101L));
+        table.newCommit().commit("0", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 12, 102L));
+        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(GenericRowData.of(2, 22, 202L));
+        table.newCommit().commit("1", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(GenericRowData.of(1, 12, 102L));
+        table.newCommit().commit("2", write.prepareCommit());
+
+        write.close();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Path tablePath = new Path(tempDir.toString());
+        Configuration conf = new Configuration();
+        conf.set(FileStoreOptions.PATH, tablePath.toString());
+        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        Schema schema =
+                new SchemaManager(tablePath)
+                        .commitNewVersion(
+                                new UpdateSchema(
+                                        ROW_TYPE,
+                                        Collections.singletonList("pt"),
+                                        Collections.emptyList(),
+                                        conf.toMap(),
+                                        ""));
+        return new AppendOnlyFileStoreTable(tablePath.getName(), schema, 
"user");
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
new file mode 100644
index 00000000..bed0cad7
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogValueCountFileStoreTable}. */
+public class ChangelogValueCountFileStoreTableTest extends 
FileStoreTableTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testBatchReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|11|101", "1|12|102", 
"1|11|101"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", 
"2|22|202"));
+    }
+
+    @Test
+    public void testBatchProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead().withProjection(PROJECTION);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("101|11", "102|12", 
"101|11"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("200|20", "201|21", 
"202|22"));
+    }
+
+    @Test
+    public void testBatchFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.equal(
+                        2, 
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "2|21|201",
+                                // this record is in the same file with 
"delete 2|21|201"
+                                "2|22|202"));
+    }
+
+    @Test
+    public void testStreamingReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+1|11|101", "-1|10|100"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+2|22|202", "-2|21|201"));
+    }
+
+    @Test
+    public void testStreamingProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+101|11", "-100|10"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+202|22", "-201|21"));
+    }
+
+    @Test
+    public void testStreamingFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.equal(
+                        2, 
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        List<Split> splits =
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "-2|21|201",
+                                // this record is in the same file with 
"delete 2|21|201"
+                                "+2|22|202"));
+    }
+
+    private void writeData() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite();
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(GenericRowData.of(1, 11, 101L));
+        table.newCommit().commit("0", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 12, 102L));
+        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(GenericRowData.of(2, 21, 201L));
+        table.newCommit().commit("1", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(GenericRowData.of(2, 22, 202L));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
+        table.newCommit().commit("2", write.prepareCommit());
+
+        write.close();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Path tablePath = new Path(tempDir.toString());
+        Configuration conf = new Configuration();
+        conf.set(FileStoreOptions.PATH, tablePath.toString());
+        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        Schema schema =
+                new SchemaManager(tablePath)
+                        .commitNewVersion(
+                                new UpdateSchema(
+                                        ROW_TYPE,
+                                        Collections.singletonList("pt"),
+                                        Collections.emptyList(),
+                                        conf.toMap(),
+                                        ""));
+        return new ChangelogValueCountFileStoreTable(tablePath.getName(), 
schema, "user");
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
new file mode 100644
index 00000000..e575651b
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase 
{
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testBatchReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("1|10|1000"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("2|21|20001", "2|22|202"));
+    }
+
+    @Test
+    public void testBatchProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead().withProjection(PROJECTION);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("1000|10"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("20001|21", "202|22"));
+    }
+
+    @Test
+    public void testBatchFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.equal(
+                                2,
+                                
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
+                        PredicateBuilder.equal(
+                                1, 
Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+        List<Split> splits = 
table.newScan().withFilter(predicate).plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                // only filter on key should be performed,
+                                // and records from the same file should also 
be selected
+                                "2|21|20001", "2|22|202"));
+    }
+
+    @Test
+    public void testStreamingReadWrite() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("-1|11|1001"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+2|21|20001", "+2|22|202", 
"-2|20|200"));
+    }
+
+    @Test
+    public void testStreamingProjection() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
+        TableRead read = 
table.newRead().withIncremental(true).withProjection(PROJECTION);
+
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("-1001|11"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_PROJECTED_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("+20001|21", "+202|22", 
"-200|20"));
+    }
+
+    @Test
+    public void testStreamingFilter() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.equal(
+                                2,
+                                
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
+                        PredicateBuilder.equal(
+                                1, 
Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+        List<Split> splits =
+                
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+        TableRead read = table.newRead().withIncremental(true);
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                // only filter on key should be performed,
+                                // and records from the same file should also 
be selected
+                                "+2|21|20001", "+2|22|202", "-2|20|200"));
+    }
+
+    private void writeData() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite();
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(GenericRowData.of(1, 11, 101L));
+        table.newCommit().commit("0", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 10, 1000L));
+        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(GenericRowData.of(2, 21, 2001L));
+        table.newCommit().commit("1", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 11, 1001L));
+        write.write(GenericRowData.of(2, 21, 20001L));
+        write.write(GenericRowData.of(2, 22, 202L));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
+        table.newCommit().commit("2", write.prepareCommit());
+
+        write.close();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Path tablePath = new Path(tempDir.toString());
+        Configuration conf = new Configuration();
+        conf.set(FileStoreOptions.PATH, tablePath.toString());
+        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        Schema schema =
+                new SchemaManager(tablePath)
+                        .commitNewVersion(
+                                new UpdateSchema(
+                                        ROW_TYPE,
+                                        Collections.singletonList("pt"),
+                                        Arrays.asList("pt", "a"),
+                                        conf.toMap(),
+                                        ""));
+        return new ChangelogWithKeyFileStoreTable(tablePath.getName(), schema, 
"user");
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
new file mode 100644
index 00000000..0dd8171b
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link FileStoreTable}. */
+public abstract class FileStoreTableTestBase {
+
+    protected static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.BIGINT().getLogicalType()
+                    },
+                    new String[] {"pt", "a", "b"});
+    protected static final int[] PROJECTION = new int[] {2, 1};
+    protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
+            rowData -> rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + 
rowData.getLong(2);
+    protected static final Function<RowData, String> 
BATCH_PROJECTED_ROW_TO_STRING =
+            rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
+    protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
+            rowData ->
+                    (rowData.getRowKind() == RowKind.INSERT ? "+" : "-")
+                            + BATCH_ROW_TO_STRING.apply(rowData);
+    protected static final Function<RowData, String> 
STREAMING_PROJECTED_ROW_TO_STRING =
+            rowData ->
+                    (rowData.getRowKind() == RowKind.INSERT ? "+" : "-")
+                            + BATCH_PROJECTED_ROW_TO_STRING.apply(rowData);
+
+    @Test
+    public void testOverwrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(2, 20, 200L));
+        table.newCommit().commit("0", write.prepareCommit());
+        write.close();
+
+        write = table.newWrite().withOverwrite(true);
+        write.write(GenericRowData.of(2, 21, 201L));
+        Map<String, String> overwritePartition = new HashMap<>();
+        overwritePartition.put("pt", "2");
+        table.newCommit()
+                .withOverwritePartition(overwritePartition)
+                .commit("1", write.prepareCommit());
+        write.close();
+
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("1|10|100"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Collections.singletonList("2|21|201"));
+    }
+
+    protected List<String> getResult(
+            TableRead read,
+            List<Split> splits,
+            BinaryRowData partition,
+            int bucket,
+            Function<RowData, String> rowDataToString)
+            throws Exception {
+        RecordReader<RowData> recordReader =
+                read.createReader(partition, bucket, getFilesFor(splits, 
partition, bucket));
+        RecordReaderIterator<RowData> iterator = new 
RecordReaderIterator<>(recordReader);
+        List<String> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            RowData rowData = iterator.next();
+            result.add(rowDataToString.apply(rowData));
+        }
+        iterator.close();
+        return result;
+    }
+
+    private List<DataFileMeta> getFilesFor(
+            List<Split> splits, BinaryRowData partition, int bucket) {
+        List<DataFileMeta> result = new ArrayList<>();
+        for (Split split : splits) {
+            if (split.partition().equals(partition) && split.bucket() == 
bucket) {
+                result.addAll(split.files());
+            }
+        }
+        return result;
+    }
+
+    protected BinaryRowData binaryRow(int a) {
+        BinaryRowData b = new BinaryRowData(1);
+        BinaryRowWriter writer = new BinaryRowWriter(b);
+        writer.writeInt(0, a);
+        writer.complete();
+        return b;
+    }
+
+    protected abstract FileStoreTable createFileStoreTable() throws Exception;
+}
diff --git 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 25f9dcda..8653682f 100644
--- 
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ 
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -50,7 +50,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws 
IOException {
         FileStoreTable table = createFileStoreTable(jobConf);
-        TableScan scan = table.newScan(false);
+        TableScan scan = table.newScan();
         createPredicate(jobConf).ifPresent(scan::withFilter);
         return scan.plan().splits.stream()
                 .map(TableStoreInputSplit::create)
@@ -64,7 +64,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
         TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
         long splitLength = split.getLength();
         return new TableStoreRecordReader(
-                table.newRead(false).createReader(split.partition(), 
split.bucket(), split.files()),
+                table.newRead().createReader(split.partition(), 
split.bucket(), split.files()),
                 splitLength);
     }
 
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
deleted file mode 100644
index 3d79d694..00000000
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.types.logical.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-
-/** Helper class to write and read {@link RowData} with {@link FileStoreImpl}. 
*/
-public class FileStoreTestHelper {
-
-    private final FileStoreTable table;
-    private final FileStore store;
-    private final BiFunction<RowData, RowData, BinaryRowData> 
partitionCalculator;
-    private final Function<RowData, Integer> bucketCalculator;
-    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
-    private final ExecutorService compactExecutor;
-
-    public FileStoreTestHelper(
-            Configuration conf,
-            RowType rowType,
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            BiFunction<RowData, RowData, BinaryRowData> partitionCalculator,
-            Function<RowData, Integer> bucketCalculator)
-            throws Exception {
-        Path tablePath = FileStoreOptions.path(conf);
-        new SchemaManager(tablePath)
-                .commitNewVersion(
-                        new UpdateSchema(rowType, partitionKeys, primaryKeys, 
conf.toMap(), ""));
-
-        // only path, other config should be read from file store.
-        conf = new Configuration();
-        conf.set(PATH, tablePath.toString());
-
-        this.table = FileStoreTableFactory.create(conf, "user");
-        this.store = table.fileStore();
-        this.partitionCalculator = partitionCalculator;
-        this.bucketCalculator = bucketCalculator;
-        this.writers = new HashMap<>();
-        this.compactExecutor = Executors.newSingleThreadExecutor();
-    }
-
-    public void write(ValueKind kind, RowData key, RowData value) throws 
Exception {
-        BinaryRowData partition = partitionCalculator.apply(key, value);
-        int bucket = bucketCalculator.apply(key);
-        RecordWriter writer =
-                writers.compute(partition, (p, m) -> m == null ? new 
HashMap<>() : m)
-                        .compute(
-                                bucket,
-                                (b, w) -> {
-                                    if (w == null) {
-                                        FileStoreWrite write = 
store.newWrite();
-                                        return write.createWriter(
-                                                partition, bucket, 
compactExecutor);
-                                    } else {
-                                        return w;
-                                    }
-                                });
-        writer.write(kind, key, value);
-    }
-
-    public void commit() throws Exception {
-        ManifestCommittable committable = new 
ManifestCommittable(UUID.randomUUID().toString());
-        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> 
entryWithPartition :
-                writers.entrySet()) {
-            for (Map.Entry<Integer, RecordWriter> entryWithBucket :
-                    entryWithPartition.getValue().entrySet()) {
-                RecordWriter writer = entryWithBucket.getValue();
-                writer.sync();
-                Increment increment = writer.prepareCommit();
-                committable.addFileCommittable(
-                        entryWithPartition.getKey(), entryWithBucket.getKey(), 
increment);
-                writer.close();
-            }
-        }
-        writers.clear();
-        FileStoreCommit commit = store.newCommit();
-        commit.commit(committable, Collections.emptyMap());
-    }
-
-    public Tuple2<RecordReader<RowData>, Long> read(BinaryRowData partition, 
int bucket)
-            throws Exception {
-        for (Split split : table.newScan(false).plan().splits) {
-            if (split.partition().equals(partition) && split.bucket() == 
bucket) {
-                return Tuple2.of(
-                        table.newRead(false).createReader(partition, bucket, 
split.files()),
-                        
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
-            }
-        }
-        throw new IllegalArgumentException(
-                "Input split not found for partition " + partition + " and 
bucket " + bucket);
-    }
-}
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
new file mode 100644
index 00000000..10721be9
--- /dev/null
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+
+/** Test utils related to {@link org.apache.flink.table.store.file.FileStore}. 
*/
+public class FileStoreTestUtils {
+
+    public static FileStoreTable createFileStoreTable(
+            Configuration conf,
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys)
+            throws Exception {
+        Path tablePath = FileStoreOptions.path(conf);
+        new SchemaManager(tablePath)
+                .commitNewVersion(
+                        new UpdateSchema(rowType, partitionKeys, primaryKeys, 
conf.toMap(), ""));
+
+        // only path, other config should be read from file store.
+        conf = new Configuration();
+        conf.set(PATH, tablePath.toString());
+        return FileStoreTableFactory.create(conf, "user");
+    }
+}
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 2ff8d7a3..d0a87ccf 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -24,12 +24,13 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.FileStoreTestHelper;
+import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -85,8 +86,8 @@ public class TableStoreHiveStorageHandlerITCase {
         conf.setString(FileStoreOptions.PATH, path);
         conf.setInteger(FileStoreOptions.BUCKET, 2);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] {
@@ -96,35 +97,17 @@ public class TableStoreHiveStorageHandlerITCase {
                                 },
                                 new String[] {"a", "b", "c"}),
                         Collections.emptyList(),
-                        Arrays.asList("a", "b"),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> k.getInt(0) % 2);
+                        Arrays.asList("a", "b"));
 
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 10L),
-                GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 20L),
-                GenericRowData.of(1, 20L, StringData.fromString("Hello")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2, 30L),
-                GenericRowData.of(2, 30L, StringData.fromString("World")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 10L),
-                GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
-        helper.write(
-                ValueKind.DELETE,
-                GenericRowData.of(2, 30L),
-                GenericRowData.of(2, 30L, StringData.fromString("World")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2, 40L),
-                GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        helper.commit();
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi 
Again")));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, 
StringData.fromString("World")));
+        write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
+        table.newCommit().commit("0", write.prepareCommit());
+        write.close();
 
         hiveShell.execute(
                 String.join(
@@ -136,13 +119,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  c STRING",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'",
-                                "TBLPROPERTIES (",
-                                "  'table-store.catalog' = 'test_catalog',",
-                                "  'table-store.primary-keys' = 'a,b',",
-                                "  'table-store.bucket' = '2',",
-                                "  'table-store.file.format' = 'avro'",
-                                ")")));
+                                "LOCATION '" + path + "'")));
         List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM 
test_table ORDER BY b");
         List<String> expected = Arrays.asList("10\t1\tHi Again", 
"20\t1\tHello", "40\t2\tTest");
         Assert.assertEquals(expected, actual);
@@ -155,8 +132,8 @@ public class TableStoreHiveStorageHandlerITCase {
         conf.setString(FileStoreOptions.PATH, path);
         conf.setInteger(FileStoreOptions.BUCKET, 2);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] {
@@ -166,35 +143,17 @@ public class TableStoreHiveStorageHandlerITCase {
                                 },
                                 new String[] {"a", "b", "c"}),
                         Collections.emptyList(),
-                        Collections.emptyList(),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> k.getInt(0) % 2);
+                        Collections.emptyList());
 
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 10L, StringData.fromString("Hi")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 20L, StringData.fromString("Hello")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2, 30L, StringData.fromString("World")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, 10L, StringData.fromString("Hi")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.DELETE,
-                GenericRowData.of(2, 30L, StringData.fromString("World")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2, 40L, StringData.fromString("Test")),
-                GenericRowData.of(1L));
-        helper.commit();
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, 
StringData.fromString("World")));
+        write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
+        table.newCommit().commit("0", write.prepareCommit());
+        write.close();
 
         hiveShell.execute(
                 String.join(
@@ -206,12 +165,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  c STRING",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'",
-                                "TBLPROPERTIES (",
-                                "  'table-store.catalog' = 'test_catalog',",
-                                "  'table-store.bucket' = '2',",
-                                "  'table-store.file.format' = 'avro'",
-                                ")")));
+                                "LOCATION '" + path + "'")));
         List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM 
test_table ORDER BY b");
         List<String> expected =
                 Arrays.asList("10\t1\tHi", "10\t1\tHi", "20\t1\tHello", 
"40\t2\tTest");
@@ -224,8 +178,8 @@ public class TableStoreHiveStorageHandlerITCase {
         Configuration conf = new Configuration();
         conf.setString(FileStoreOptions.PATH, root);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 
RandomGenericRowDataGenerator.TYPE_INFOS.stream()
@@ -236,9 +190,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                         .toArray(LogicalType[]::new),
                                 
RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
                         Collections.emptyList(),
-                        Collections.singletonList("f_int"),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> 0);
+                        Collections.singletonList("f_int"));
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
         List<GenericRowData> input = new ArrayList<>();
@@ -252,10 +204,13 @@ public class TableStoreHiveStorageHandlerITCase {
                 }
             }
         }
+
+        TableWrite write = table.newWrite();
         for (GenericRowData rowData : input) {
-            helper.write(ValueKind.ADD, GenericRowData.of(rowData.getInt(3)), 
rowData);
+            write.write(rowData);
         }
-        helper.commit();
+        table.newCommit().commit("0", write.prepareCommit());
+        write.close();
 
         StringBuilder ddl = new StringBuilder();
         for (int i = 0; i < RandomGenericRowDataGenerator.FIELD_NAMES.size(); 
i++) {
@@ -365,31 +320,31 @@ public class TableStoreHiveStorageHandlerITCase {
         Configuration conf = new Configuration();
         conf.setString(FileStoreOptions.PATH, path);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] 
{DataTypes.INT().getLogicalType()},
                                 new String[] {"a"}),
                         Collections.emptyList(),
-                        Collections.emptyList(),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> 0);
+                        Collections.emptyList());
 
         // TODO add NaN related tests after FLINK-27627 and FLINK-27628 are 
fixed
 
-        helper.write(ValueKind.ADD, GenericRowData.of(1), 
GenericRowData.of(1L));
-        helper.commit();
-        helper.write(ValueKind.ADD, GenericRowData.of((Object) null), 
GenericRowData.of(1L));
-        helper.commit();
-        helper.write(ValueKind.ADD, GenericRowData.of(2), 
GenericRowData.of(1L));
-        helper.write(ValueKind.ADD, GenericRowData.of(3), 
GenericRowData.of(1L));
-        helper.write(ValueKind.ADD, GenericRowData.of((Object) null), 
GenericRowData.of(1L));
-        helper.commit();
-        helper.write(ValueKind.ADD, GenericRowData.of(4), 
GenericRowData.of(1L));
-        helper.write(ValueKind.ADD, GenericRowData.of(5), 
GenericRowData.of(1L));
-        helper.write(ValueKind.ADD, GenericRowData.of(6), 
GenericRowData.of(1L));
-        helper.commit();
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1));
+        table.newCommit().commit("0", write.prepareCommit());
+        write.write(GenericRowData.of((Object) null));
+        table.newCommit().commit("1", write.prepareCommit());
+        write.write(GenericRowData.of(2));
+        write.write(GenericRowData.of(3));
+        write.write(GenericRowData.of((Object) null));
+        table.newCommit().commit("2", write.prepareCommit());
+        write.write(GenericRowData.of(4));
+        write.write(GenericRowData.of(5));
+        write.write(GenericRowData.of(6));
+        table.newCommit().commit("3", write.prepareCommit());
+        write.close();
 
         hiveShell.execute(
                 String.join(
@@ -454,8 +409,8 @@ public class TableStoreHiveStorageHandlerITCase {
         Configuration conf = new Configuration();
         conf.setString(FileStoreOptions.PATH, path);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] {
@@ -464,30 +419,24 @@ public class TableStoreHiveStorageHandlerITCase {
                                 },
                                 new String[] {"dt", "ts"}),
                         Collections.emptyList(),
-                        Collections.emptyList(),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> 0);
+                        Collections.emptyList());
 
-        helper.write(
-                ValueKind.ADD,
+        TableWrite write = table.newWrite();
+        write.write(
                 GenericRowData.of(
                         375, /* 1971-01-11 */
-                        TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 
5, 17, 17, 29, 20))),
-                GenericRowData.of(1L));
-        helper.commit();
-        helper.write(ValueKind.ADD, GenericRowData.of(null, null), 
GenericRowData.of(1L));
-        helper.commit();
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(376 /* 1971-01-12 */, null),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
+        table.newCommit().commit("0", write.prepareCommit());
+        write.write(GenericRowData.of(null, null));
+        table.newCommit().commit("1", write.prepareCommit());
+        write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
+        write.write(
                 GenericRowData.of(
                         null,
-                        TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 
6, 18, 8, 30, 0))),
-                GenericRowData.of(1L));
-        helper.commit();
+                        TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 
6, 18, 8, 30, 0))));
+        table.newCommit().commit("2", write.prepareCommit());
+        write.close();
 
         hiveShell.execute(
                 String.join(
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 0edb2548..064a239e 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -24,14 +24,19 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.FileStoreTestHelper;
+import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -54,8 +59,8 @@ public class TableStoreRecordReaderTest {
         Configuration conf = new Configuration();
         conf.setString(FileStoreOptions.PATH, tempDir.toString());
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] {
@@ -64,33 +69,17 @@ public class TableStoreRecordReaderTest {
                                 },
                                 new String[] {"a", "b"}),
                         Collections.emptyList(),
-                        Collections.singletonList("a"),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> 0);
-
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1L),
-                GenericRowData.of(1L, StringData.fromString("Hi")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2L),
-                GenericRowData.of(2L, StringData.fromString("Hello")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(3L),
-                GenericRowData.of(3L, StringData.fromString("World")));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1L),
-                GenericRowData.of(1L, StringData.fromString("Hi again")));
-        helper.write(
-                ValueKind.DELETE,
-                GenericRowData.of(2L),
-                GenericRowData.of(2L, StringData.fromString("Hello")));
-        helper.commit();
-
-        Tuple2<RecordReader<RowData>, Long> tuple = 
helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+                        Collections.singletonList("a"));
+
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1L, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(2L, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(3L, StringData.fromString("World")));
+        write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, 
StringData.fromString("Hello")));
+        table.newCommit().commit("0", write.prepareCommit());
+
+        Tuple2<RecordReader<RowData>, Long> tuple = read(table, 
BinaryRowDataUtil.EMPTY_ROW, 0);
         TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, 
tuple.f1);
         RowDataContainer container = reader.createValue();
         Set<String> actual = new HashSet<>();
@@ -111,8 +100,8 @@ public class TableStoreRecordReaderTest {
         Configuration conf = new Configuration();
         conf.setString(FileStoreOptions.PATH, tempDir.toString());
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
-        FileStoreTestHelper helper =
-                new FileStoreTestHelper(
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
                                 new LogicalType[] {
@@ -121,37 +110,18 @@ public class TableStoreRecordReaderTest {
                                 },
                                 new String[] {"a", "b"}),
                         Collections.emptyList(),
-                        Collections.emptyList(),
-                        (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
-                        k -> 0);
-
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, StringData.fromString("Hi")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(2, StringData.fromString("Hello")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(3, StringData.fromString("World")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, StringData.fromString("Hi")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.DELETE,
-                GenericRowData.of(2, StringData.fromString("Hello")),
-                GenericRowData.of(1L));
-        helper.write(
-                ValueKind.ADD,
-                GenericRowData.of(1, StringData.fromString("Hi")),
-                GenericRowData.of(1L));
-        helper.commit();
-
-        Tuple2<RecordReader<RowData>, Long> tuple = 
helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+                        Collections.emptyList());
+
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(2, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(3, StringData.fromString("World")));
+        write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 
StringData.fromString("Hello")));
+        write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+        table.newCommit().commit("0", write.prepareCommit());
+
+        Tuple2<RecordReader<RowData>, Long> tuple = read(table, 
BinaryRowDataUtil.EMPTY_ROW, 0);
         TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, 
tuple.f1);
         RowDataContainer container = reader.createValue();
         Map<String, Integer> actual = new HashMap<>();
@@ -166,4 +136,17 @@ public class TableStoreRecordReaderTest {
         expected.put("3|World", 1);
         assertThat(actual).isEqualTo(expected);
     }
+
+    private Tuple2<RecordReader<RowData>, Long> read(
+            FileStoreTable table, BinaryRowData partition, int bucket) throws 
Exception {
+        for (Split split : table.newScan().plan().splits) {
+            if (split.partition().equals(partition) && split.bucket() == 
bucket) {
+                return Tuple2.of(
+                        table.newRead().createReader(partition, bucket, 
split.files()),
+                        
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+            }
+        }
+        throw new IllegalArgumentException(
+                "Input split not found for partition " + partition + " and 
bucket " + bucket);
+    }
 }
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index 55fab14e..ccc42576 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.types.RowKind;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index e084bb9d..1f1b79b8 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
 import org.apache.flink.table.store.log.LogOptions.LogConsistency;
 import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 1a48f771..37e7aeea 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 40540093..4e920f04 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -36,7 +36,7 @@ import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
index e06d527b..fa4dbdd1 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.Precommitting
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.store.log.LogInitContext;
 import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 
 import java.io.IOException;
 import java.util.Collection;

Reply via email to