This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 38785e0  [FLINK-26530] Introduce TableStore API and refactor ITCases
38785e0 is described below

commit 38785e01bfdc58b05e4d6596d43757386bf5a108
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 11 11:56:41 2022 +0800

    [FLINK-26530] Introduce TableStore API and refactor ITCases
    
    This closes #36
---
 .../flink/table/store/connector/TableStore.java    | 279 +++++++++++++++++++++
 .../connector/sink/BucketStreamPartitioner.java    |  80 ++++++
 .../store/connector/source/FileStoreSource.java    |  38 +--
 .../source/FileStoreSourceSplitGenerator.java      |   4 -
 .../connector/source/LogHybridSourceFactory.java   |  52 ++++
 .../connector/source/PendingSplitsCheckpoint.java  |  22 +-
 .../source/PendingSplitsCheckpointSerializer.java  |   6 +-
 .../source/StaticFileStoreSplitEnumerator.java     |  20 +-
 .../table/store/connector/FileStoreITCase.java     | 202 +++++++--------
 .../store/connector/sink/LogStoreSinkITCase.java   | 134 +++++-----
 .../PendingSplitsCheckpointSerializerTest.java     |  12 +-
 .../source/StaticFileStoreSplitEnumeratorTest.java |   3 +-
 .../table/store/file/operation/FileStoreScan.java  |   3 +
 .../store/file/operation/FileStoreScanImpl.java    |   8 +-
 .../table/store/sink/SinkRecordConverter.java      |  14 +-
 15 files changed, 636 insertions(+), 241 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
new file mode 100644
index 0000000..609cee9
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
+import org.apache.flink.table.store.connector.sink.StoreSink;
+import 
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
+import 
org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.utils.ProjectionUtils;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+
+/** A table store api to create source and sink. */
+@Experimental
+public class TableStore {
+
+    private final Configuration options;
+
+    /** commit user, default uuid. */
+    private String user = UUID.randomUUID().toString();
+
+    /** partition keys, default no partition. */
+    private int[] partitions = new int[0];
+
+    /** primary keys, default no key. */
+    private int[] primaryKeys = new int[0];
+
+    private RowType type;
+
+    private ObjectIdentifier tableIdentifier;
+
+    public TableStore(Configuration options) {
+        this.options = options;
+    }
+
+    public TableStore withUser(String user) {
+        this.user = user;
+        return this;
+    }
+
+    public TableStore withSchema(RowType type) {
+        this.type = type;
+        return this;
+    }
+
+    public TableStore withPartitions(int[] partitions) {
+        this.partitions = partitions;
+        return this;
+    }
+
+    public TableStore withPrimaryKeys(int[] primaryKeys) {
+        this.primaryKeys = primaryKeys;
+        return this;
+    }
+
+    public TableStore withTableIdentifier(ObjectIdentifier tableIdentifier) {
+        this.tableIdentifier = tableIdentifier;
+        return this;
+    }
+
+    public SourceBuilder sourceBuilder() {
+        return new SourceBuilder();
+    }
+
+    public SinkBuilder sinkBuilder() {
+        return new SinkBuilder();
+    }
+
+    private FileStore buildFileStore() {
+        RowType partitionType = ProjectionUtils.project(type, partitions);
+        RowType keyType;
+        RowType valueType;
+        Accumulator accumulator;
+        if (primaryKeys.length == 0) {
+            keyType = type;
+            valueType = RowType.of(new BigIntType(false));
+            accumulator = new ValueCountAccumulator();
+        } else {
+            List<RowType.RowField> fields = ProjectionUtils.project(type, 
primaryKeys).getFields();
+            // add _KEY_ prefix to avoid conflict with value
+            keyType =
+                    new RowType(
+                            fields.stream()
+                                    .map(
+                                            f ->
+                                                    new RowType.RowField(
+                                                            "_KEY_" + 
f.getName(),
+                                                            f.getType(),
+                                                            
f.getDescription().orElse(null)))
+                                    .collect(Collectors.toList()));
+            valueType = type;
+            accumulator = new DeduplicateAccumulator();
+        }
+        return new FileStoreImpl(options, user, partitionType, keyType, 
valueType, accumulator);
+    }
+
+    /** Source builder to build a flink {@link Source}. */
+    public class SourceBuilder {
+
+        private boolean isContinuous = false;
+
+        private boolean isHybrid = true;
+
+        @Nullable private int[][] projectedFields;
+
+        @Nullable private Predicate predicate;
+
+        @Nullable private LogSourceProvider logSourceProvider;
+
+        public SourceBuilder withProjection(int[][] projectedFields) {
+            this.projectedFields = projectedFields;
+            return this;
+        }
+
+        public SourceBuilder withPredicate(Predicate predicate) {
+            this.predicate = predicate;
+            return this;
+        }
+
+        public SourceBuilder withContinuousMode(boolean isContinuous) {
+            this.isContinuous = isContinuous;
+            return this;
+        }
+
+        public SourceBuilder withHybridMode(boolean isHybrid) {
+            this.isHybrid = isHybrid;
+            return this;
+        }
+
+        public SourceBuilder withLogSourceProvider(LogSourceProvider 
logSourceProvider) {
+            this.logSourceProvider = logSourceProvider;
+            return this;
+        }
+
+        private FileStoreSource buildFileStoreSource() {
+            FileStore fileStore = buildFileStore();
+            return new FileStoreSource(
+                    fileStore, primaryKeys.length == 0, projectedFields, 
predicate);
+        }
+
+        public Source<RowData, ?, ?> build() {
+            if (isContinuous) {
+                if (logSourceProvider == null) {
+                    throw new UnsupportedOperationException(
+                            "File store continuous mode is not supported 
yet.");
+                }
+
+                // TODO project log source
+
+                if (isHybrid) {
+                    return HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
+                                    buildFileStoreSource())
+                            .addSource(
+                                    new 
LogHybridSourceFactory(logSourceProvider),
+                                    Boundedness.CONTINUOUS_UNBOUNDED)
+                            .build();
+                } else {
+                    return logSourceProvider.createSource(null);
+                }
+            } else {
+                return buildFileStoreSource();
+            }
+        }
+
+        public DataStreamSource<RowData> build(StreamExecutionEnvironment env) 
{
+            return env.fromSource(
+                    build(),
+                    WatermarkStrategy.noWatermarks(),
+                    tableIdentifier.asSummaryString(),
+                    InternalTypeInfo.of(type));
+        }
+    }
+
+    /** Sink builder to build a flink sink from input. */
+    public class SinkBuilder {
+
+        private DataStream<RowData> input;
+
+        @Nullable private CatalogLock.Factory lockFactory;
+
+        @Nullable private Map<String, String> overwritePartition;
+
+        @Nullable private LogSinkProvider logSinkProvider;
+
+        public SinkBuilder withInput(DataStream<RowData> input) {
+            this.input = input;
+            return this;
+        }
+
+        public SinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
+            this.lockFactory = lockFactory;
+            return this;
+        }
+
+        public SinkBuilder withOverwritePartition(Map<String, String> 
overwritePartition) {
+            this.overwritePartition = overwritePartition;
+            return this;
+        }
+
+        public SinkBuilder withLogSinkProvider(LogSinkProvider 
logSinkProvider) {
+            this.logSinkProvider = logSinkProvider;
+            return this;
+        }
+
+        public DataStreamSink<?> build() {
+            FileStore fileStore = buildFileStore();
+            int numBucket = options.get(BUCKET);
+
+            BucketStreamPartitioner partitioner =
+                    new BucketStreamPartitioner(numBucket, type, partitions, 
primaryKeys);
+            DataStream<RowData> partitioned =
+                    new DataStream<>(
+                            input.getExecutionEnvironment(),
+                            new 
PartitionTransformation<>(input.getTransformation(), partitioner));
+
+            StoreSink<?, ?> sink =
+                    new StoreSink<>(
+                            tableIdentifier,
+                            fileStore,
+                            partitions,
+                            primaryKeys,
+                            numBucket,
+                            lockFactory,
+                            overwritePartition,
+                            logSinkProvider);
+            return GlobalCommittingSinkTranslator.translate(partitioned, sink);
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
new file mode 100644
index 0000000..a20253b
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+/** A {@link StreamPartitioner} to partition records by bucket. */
+public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
+
+    private final int numBucket;
+    private final RowType inputType;
+    private final int[] partitions;
+    private final int[] primaryKeys;
+
+    private transient SinkRecordConverter recordConverter;
+
+    public BucketStreamPartitioner(
+            int numBucket, RowType inputType, int[] partitions, int[] 
primaryKeys) {
+        this.numBucket = numBucket;
+        this.inputType = inputType;
+        this.partitions = partitions;
+        this.primaryKeys = primaryKeys;
+    }
+
+    @Override
+    public void setup(int numberOfChannels) {
+        super.setup(numberOfChannels);
+        this.recordConverter =
+                new SinkRecordConverter(numBucket, inputType, partitions, 
primaryKeys);
+    }
+
+    @Override
+    public int selectChannel(SerializationDelegate<StreamRecord<RowData>> 
record) {
+        RowData row = record.getInstance().getValue();
+        int bucket = recordConverter.bucket(row, recordConverter.key(row));
+        return bucket % numberOfChannels;
+    }
+
+    @Override
+    public StreamPartitioner<RowData> copy() {
+        return this;
+    }
+
+    @Override
+    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+        return SubtaskStateMapper.FULL;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "bucket-assigner";
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index ce36204..b595df5 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -26,13 +26,14 @@ import 
org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 
 import javax.annotation.Nullable;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
+import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
 
 /** {@link Source} of file store. */
 public class FileStoreSource
@@ -46,21 +47,17 @@ public class FileStoreSource
 
     @Nullable private final int[][] projectedFields;
 
-    @Nullable private final Predicate partitionPredicate;
-
-    @Nullable private final Predicate fieldsPredicate;
+    @Nullable private final Predicate predicate;
 
     public FileStoreSource(
             FileStore fileStore,
             boolean valueCountMode,
             @Nullable int[][] projectedFields,
-            @Nullable Predicate partitionPredicate,
-            @Nullable Predicate fieldsPredicate) {
+            @Nullable Predicate predicate) {
         this.fileStore = fileStore;
         this.valueCountMode = valueCountMode;
         this.projectedFields = projectedFields;
-        this.partitionPredicate = partitionPredicate;
-        this.fieldsPredicate = fieldsPredicate;
+        this.predicate = predicate;
     }
 
     @Override
@@ -74,6 +71,7 @@ public class FileStoreSource
         FileStoreRead read = fileStore.newRead();
         if (projectedFields != null) {
             if (valueCountMode) {
+                // TODO don't project keys, and add key projection to split 
reader
                 read.withKeyProjection(projectedFields);
             } else {
                 read.withValueProjection(projectedFields);
@@ -86,15 +84,14 @@ public class FileStoreSource
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context) {
         FileStoreScan scan = fileStore.newScan();
-        if (partitionPredicate != null) {
-            scan.withPartitionFilter(partitionPredicate);
-        }
-        if (fieldsPredicate != null) {
-            if (valueCountMode) {
-                scan.withKeyFilter(fieldsPredicate);
-            } else {
-                scan.withValueFilter(fieldsPredicate);
-            }
+        if (predicate != null) {
+            // TODO split predicate into partitionPredicate and fieldsPredicate
+            //            scan.withPartitionFilter(partitionPredicate);
+            //            if (keyAsRecord) {
+            //                scan.withKeyFilter(fieldsPredicate);
+            //            } else {
+            //                scan.withValueFilter(fieldsPredicate);
+            //            }
         }
         return new StaticFileStoreSplitEnumerator(context, scan);
     }
@@ -103,8 +100,11 @@ public class FileStoreSource
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
-        checkArgument(checkpoint.nextSnapshotId() == -1);
-        return new StaticFileStoreSplitEnumerator(context, 
checkpoint.splits());
+        Snapshot snapshot = null;
+        if (checkpoint.currentSnapshotId() != INVALID_SNAPSHOT) {
+            snapshot = 
fileStore.newScan().snapshot(checkpoint.currentSnapshotId());
+        }
+        return new StaticFileStoreSplitEnumerator(context, snapshot, 
checkpoint.splits());
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index 217119d..f9b64ab 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -35,10 +35,6 @@ public class FileStoreSourceSplitGenerator {
      */
     private final char[] currentId = "0000000000".toCharArray();
 
-    public List<FileStoreSourceSplit> createSplits(FileStoreScan scan) {
-        return createSplits(scan.plan());
-    }
-
     public List<FileStoreSourceSplit> createSplits(FileStoreScan.Plan plan) {
         return plan.groupByPartFiles().entrySet().stream()
                 .flatMap(
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
new file mode 100644
index 0000000..0f892fc
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import 
org.apache.flink.connector.base.source.hybrid.HybridSource.SourceFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.log.LogSourceProvider;
+
+import java.util.Map;
+
+/** Log {@link SourceFactory} from {@link StaticFileStoreSplitEnumerator}. */
+public class LogHybridSourceFactory
+        implements SourceFactory<RowData, Source<RowData, ?, ?>, 
StaticFileStoreSplitEnumerator> {
+
+    private final LogSourceProvider provider;
+
+    public LogHybridSourceFactory(LogSourceProvider provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public Source<RowData, ?, ?> create(
+            HybridSource.SourceSwitchContext<StaticFileStoreSplitEnumerator> 
context) {
+        StaticFileStoreSplitEnumerator enumerator = 
context.getPreviousEnumerator();
+        Snapshot snapshot = enumerator.snapshot();
+        Map<Integer, Long> logOffsets = null;
+        if (snapshot != null) {
+            // TODO
+            // logOffsets = snapshot.getLogOffsets();
+        }
+        return provider.createSource(logOffsets);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
index 666cdf8..10c2b23 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
@@ -26,30 +26,24 @@ import java.util.Collection;
  */
 public class PendingSplitsCheckpoint {
 
+    public static final long INVALID_SNAPSHOT = -1L;
+
     /** The splits in the checkpoint. */
     private final Collection<FileStoreSourceSplit> splits;
 
-    private final long nextSnapshotId;
+    private final long currentSnapshotId;
 
-    private PendingSplitsCheckpoint(Collection<FileStoreSourceSplit> splits, 
long nextSnapshotId) {
+    public PendingSplitsCheckpoint(
+            Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
         this.splits = splits;
-        this.nextSnapshotId = nextSnapshotId;
+        this.currentSnapshotId = currentSnapshotId;
     }
 
     public Collection<FileStoreSourceSplit> splits() {
         return splits;
     }
 
-    public long nextSnapshotId() {
-        return nextSnapshotId;
-    }
-
-    public static PendingSplitsCheckpoint 
fromStatic(Collection<FileStoreSourceSplit> splits) {
-        return new PendingSplitsCheckpoint(splits, -1);
-    }
-
-    public static PendingSplitsCheckpoint fromContinuous(
-            Collection<FileStoreSourceSplit> splits, long nextSnapshotId) {
-        return new PendingSplitsCheckpoint(splits, nextSnapshotId);
+    public long currentSnapshotId() {
+        return currentSnapshotId;
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
index b38748e..b3df0ac 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
@@ -52,7 +52,7 @@ public class PendingSplitsCheckpointSerializer
             view.writeInt(bytes.length);
             view.write(bytes);
         }
-        view.writeLong(pendingSplitsCheckpoint.nextSnapshotId());
+        view.writeLong(pendingSplitsCheckpoint.currentSnapshotId());
         return out.toByteArray();
     }
 
@@ -67,7 +67,7 @@ public class PendingSplitsCheckpointSerializer
             view.readFully(bytes);
             splits.add(splitSerializer.deserialize(version, bytes));
         }
-        long nextSnapshotId = view.readLong();
-        return PendingSplitsCheckpoint.fromContinuous(splits, nextSnapshotId);
+        long currentSnapshotId = view.readLong();
+        return new PendingSplitsCheckpoint(splits, currentSnapshotId);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 6a884b2..4fc828b 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 
 import javax.annotation.Nullable;
@@ -30,25 +31,34 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+
 /** A SplitEnumerator implementation for bounded / batch {@link 
FileStoreSource} input. */
 public class StaticFileStoreSplitEnumerator
         implements SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
 
     private final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
+    @Nullable private final Snapshot snapshot;
+
     private final Queue<FileStoreSourceSplit> splits;
 
     public StaticFileStoreSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Snapshot snapshot,
             Collection<FileStoreSourceSplit> splits) {
         this.context = context;
+        this.snapshot = snapshot;
         this.splits = new LinkedList<>(splits);
     }
 
     public StaticFileStoreSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context, 
FileStoreScan scan) {
         this.context = context;
-        this.splits = new LinkedList<>(new 
FileStoreSourceSplitGenerator().createSplits(scan));
+        FileStoreScan.Plan plan = scan.plan();
+        Long snapshotId = plan.snapshotId();
+        this.snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+        this.splits = new LinkedList<>(new 
FileStoreSourceSplitGenerator().createSplits(plan));
     }
 
     @Override
@@ -83,11 +93,17 @@ public class StaticFileStoreSplitEnumerator
 
     @Override
     public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        return PendingSplitsCheckpoint.fromStatic(new ArrayList<>(splits));
+        return new PendingSplitsCheckpoint(
+                new ArrayList<>(splits), snapshot == null ? INVALID_SNAPSHOT : 
snapshot.id());
     }
 
     @Override
     public void close() {
         // no resources to close
     }
+
+    @Nullable
+    public Snapshot snapshot() {
+        return snapshot;
+    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 221e85c..7c4a884 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -19,11 +19,10 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -32,13 +31,8 @@ import 
org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.connector.sink.StoreSink;
-import 
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
 import org.apache.flink.table.store.connector.source.FileStoreSource;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -49,18 +43,19 @@ import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Assume;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
@@ -71,13 +66,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @RunWith(Parameterized.class)
 public class FileStoreITCase extends AbstractTestBase {
 
-    private static final RowType PARTITION_TYPE =
-            new RowType(Collections.singletonList(new RowType.RowField("p", 
new VarCharType())));
-
-    private static final RowType KEY_TYPE =
-            new RowType(Collections.singletonList(new RowType.RowField("k", 
new IntType())));
-
-    public static final RowType VALUE_TYPE =
+    public static final RowType TABLE_TYPE =
             new RowType(
                     Arrays.asList(
                             new RowType.RowField("v", new IntType()),
@@ -89,14 +78,15 @@ public class FileStoreITCase extends AbstractTestBase {
     public static final DataStructureConverter<RowData, Row> CONVERTER =
             (DataStructureConverter)
                     DataStructureConverters.getConverter(
-                            TypeConversions.fromLogicalToDataType(VALUE_TYPE));
+                            TypeConversions.fromLogicalToDataType(TABLE_TYPE));
 
     private static final int NUM_BUCKET = 3;
 
-    private static final List<RowData> SOURCE_DATA =
+    public static final List<RowData> SOURCE_DATA =
             Arrays.asList(
                     wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
                     wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
                     wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
                     wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
                     wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
@@ -104,8 +94,14 @@ public class FileStoreITCase extends AbstractTestBase {
 
     private final boolean isBatch;
 
-    public FileStoreITCase(boolean isBatch) {
+    private final StreamExecutionEnvironment env;
+
+    private final TableStore store;
+
+    public FileStoreITCase(boolean isBatch) throws IOException {
         this.isBatch = isBatch;
+        this.env = isBatch ? buildBatchEnv() : buildStreamEnv();
+        this.store = buildTableStore(isBatch, TEMPORARY_FOLDER);
     }
 
     @Parameterized.Parameters(name = "isBatch-{0}")
@@ -114,83 +110,103 @@ public class FileStoreITCase extends AbstractTestBase {
     }
 
     private static SerializableRowData wrap(RowData row) {
-        return new SerializableRowData(row, 
InternalSerializers.create(VALUE_TYPE));
+        return new SerializableRowData(row, 
InternalSerializers.create(TABLE_TYPE));
     }
 
     @Test
     public void testPartitioned() throws Exception {
-        innerTest(true);
+        store.withPartitions(new int[] {1});
+
+        // write
+        store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+        env.execute();
+
+        // read
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+
+        // assert
+        Row[] expected =
+                new Row[] {
+                    Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1", 
1), Row.of(0, "p1", 2)
+                };
+        assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
     @Test
     public void testNonPartitioned() throws Exception {
-        innerTest(false);
+        store.withPartitions(new int[0]);
+
+        // write
+        store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+        env.execute();
+
+        // read
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+
+        // assert
+        Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)};
+        assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
     @Test
     public void testOverwrite() throws Exception {
         Assume.assumeTrue(isBatch);
+        store.withPartitions(new int[] {1});
 
-        StreamExecutionEnvironment env = buildBatchEnv();
-        FileStore fileStore =
-                buildFileStore(buildConfiguration(isBatch, 
TEMPORARY_FOLDER.newFolder()), true);
-
-        // sink
-        DataStreamSource<RowData> finiteSource = buildTestSource(env, true);
-        write(finiteSource, fileStore, true);
+        // write
+        store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+        env.execute();
 
         // overwrite p2
-        finiteSource =
+        DataStreamSource<RowData> partialData =
                 env.fromCollection(
                         Collections.singletonList(
                                 wrap(GenericRowData.of(9, 
StringData.fromString("p2"), 5))),
-                        InternalTypeInfo.of(VALUE_TYPE));
+                        InternalTypeInfo.of(TABLE_TYPE));
         Map<String, String> overwrite = new HashMap<>();
         overwrite.put("p", "p2");
-        write(finiteSource, fileStore, true, overwrite);
+        
store.sinkBuilder().withInput(partialData).withOverwritePartition(overwrite).build();
+        env.execute();
 
         // read
-        List<Row> results = read(env, fileStore);
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
 
         Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
 
         // overwrite all
-        finiteSource =
+        partialData =
                 env.fromCollection(
                         Collections.singletonList(
                                 wrap(GenericRowData.of(19, 
StringData.fromString("p2"), 6))),
-                        InternalTypeInfo.of(VALUE_TYPE));
-        write(finiteSource, fileStore, true, new HashMap<>());
+                        InternalTypeInfo.of(TABLE_TYPE));
+        store.sinkBuilder().withInput(partialData).withOverwritePartition(new 
HashMap<>()).build();
+        env.execute();
 
         // read
-        results = read(env, fileStore);
+        results = executeAndCollect(store.sourceBuilder().build(env));
         expected = new Row[] {Row.of(19, "p2", 6)};
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
-    private void innerTest(boolean partitioned) throws Exception {
-        StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : 
buildStreamEnv();
-        FileStore fileStore =
-                buildFileStore(
-                        buildConfiguration(isBatch, 
TEMPORARY_FOLDER.newFolder()), partitioned);
-
-        // sink
-        DataStreamSource<RowData> finiteSource = buildTestSource(env, isBatch);
-        write(finiteSource, fileStore, partitioned);
+    @Test
+    public void testPartitionedNonKey() throws Exception {
+        store.withPartitions(new int[] {1}).withPrimaryKeys(new int[0]);
 
-        // source
-        List<Row> results = read(env, fileStore);
+        // write
+        store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+        env.execute();
 
-        Row[] expected =
-                partitioned
-                        ? new Row[] {
-                            Row.of(5, "p2", 1),
-                            Row.of(3, "p2", 5),
-                            Row.of(5, "p1", 1),
-                            Row.of(0, "p1", 2)
-                        }
-                        : new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)};
+        // read
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+
+        // assert
+        // in streaming mode, expect origin data X 2 (FiniteTestSource)
+        Stream<RowData> expectedStream =
+                isBatch
+                        ? SOURCE_DATA.stream()
+                        : Stream.concat(SOURCE_DATA.stream(), 
SOURCE_DATA.stream());
+        Row[] expected = 
expectedStream.map(CONVERTER::toExternal).toArray(Row[]::new);
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
@@ -209,6 +225,14 @@ public class FileStoreITCase extends AbstractTestBase {
         return env;
     }
 
+    public static TableStore buildTableStore(boolean noFail, TemporaryFolder 
temporaryFolder)
+            throws IOException {
+        return new TableStore(buildConfiguration(noFail, 
temporaryFolder.newFolder()))
+                .withSchema(TABLE_TYPE)
+                .withPrimaryKeys(new int[] {2})
+                .withTableIdentifier(ObjectIdentifier.of("catalog", "db", 
"t"));
+    }
+
     public static Configuration buildConfiguration(boolean noFail, File 
folder) {
         Configuration options = new Configuration();
         options.set(BUCKET, NUM_BUCKET);
@@ -222,72 +246,16 @@ public class FileStoreITCase extends AbstractTestBase {
         return options;
     }
 
-    public static FileStore buildFileStore(Configuration options, boolean 
partitioned) {
-        return new FileStoreImpl(
-                options,
-                "user",
-                partitioned ? PARTITION_TYPE : RowType.of(),
-                KEY_TYPE,
-                VALUE_TYPE,
-                new DeduplicateAccumulator());
-    }
-
     public static DataStreamSource<RowData> buildTestSource(
             StreamExecutionEnvironment env, boolean isBatch) {
         return isBatch
-                ? env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(VALUE_TYPE))
+                ? env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(TABLE_TYPE))
                 : env.addSource(
-                        new FiniteTestSource<>(SOURCE_DATA), 
InternalTypeInfo.of(VALUE_TYPE));
-    }
-
-    public static void write(DataStream<RowData> input, FileStore fileStore, 
boolean partitioned)
-            throws Exception {
-        write(input, fileStore, partitioned, null);
-    }
-
-    public static void write(
-            DataStream<RowData> input,
-            FileStore fileStore,
-            boolean partitioned,
-            @Nullable Map<String, String> overwritePartition)
-            throws Exception {
-        write(input, fileStore, partitioned, overwritePartition, null);
-    }
-
-    public static void write(
-            DataStream<RowData> input,
-            FileStore fileStore,
-            boolean partitioned,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkProvider logSinkProvider)
-            throws Exception {
-        int[] partitions = partitioned ? new int[] {1} : new int[0];
-        int[] keys = new int[] {2};
-        StoreSink<?, ?> sink =
-                new StoreSink<>(
-                        null,
-                        fileStore,
-                        partitions,
-                        keys,
-                        NUM_BUCKET,
-                        null,
-                        overwritePartition,
-                        logSinkProvider);
-        input = input.keyBy(row -> row.getInt(2)); // key by
-        GlobalCommittingSinkTranslator.translate(input, sink);
-        input.getExecutionEnvironment().execute();
+                        new FiniteTestSource<>(SOURCE_DATA), 
InternalTypeInfo.of(TABLE_TYPE));
     }
 
-    public static List<Row> read(StreamExecutionEnvironment env, FileStore 
fileStore)
-            throws Exception {
-        FileStoreSource source = new FileStoreSource(fileStore, false, null, 
null, null);
-        CloseableIterator<RowData> iterator =
-                env.fromSource(
-                                source,
-                                WatermarkStrategy.noWatermarks(),
-                                "source",
-                                InternalTypeInfo.of(VALUE_TYPE))
-                        .executeAndCollect();
+    public static List<Row> executeAndCollect(DataStreamSource<RowData> 
source) throws Exception {
+        CloseableIterator<RowData> iterator = source.executeAndCollect();
         List<Row> results = new ArrayList<>();
         while (iterator.hasNext()) {
             results.add(CONVERTER.toExternal(iterator.next()));
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 023b4b6..c4e8eb4 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.connector.FileStoreITCase;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.connector.TableStore;
 import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
 import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
 import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
@@ -37,12 +34,16 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.flink.table.store.connector.FileStoreITCase.CONVERTER;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.SOURCE_DATA;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.TABLE_TYPE;
 import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
-import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildConfiguration;
-import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildFileStore;
 import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildTableStore;
 import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.executeAndCollect;
 import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
 import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
 import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
@@ -53,33 +54,48 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
 
     @Test
     public void testStreamingPartitioned() throws Exception {
-        innerTest("testStreamingPartitioned", false, true, true);
+        innerTest("testStreamingPartitioned", false, true, true, true);
     }
 
     @Test
     public void testStreamingNonPartitioned() throws Exception {
-        innerTest("testStreamingNonPartitioned", false, false, true);
+        innerTest("testStreamingNonPartitioned", false, false, true, true);
     }
 
     @Test
     public void testBatchPartitioned() throws Exception {
-        innerTest("testBatchPartitioned", true, true, true);
+        innerTest("testBatchPartitioned", true, true, true, true);
     }
 
     @Test
     public void testStreamingEventual() throws Exception {
-        innerTest("testStreamingEventual", false, true, false);
+        innerTest("testStreamingEventual", false, true, false, true);
     }
 
-    private void innerTest(String name, boolean isBatch, boolean partitioned, 
boolean transaction)
+    @Test
+    public void testStreamingPartitionedNonKey() throws Exception {
+        innerTest("testStreamingPartitionedNonKey", false, true, true, false);
+    }
+
+    @Test
+    public void testBatchPartitionedNonKey() throws Exception {
+        innerTest("testBatchPartitionedNonKey", true, true, true, false);
+    }
+
+    private void innerTest(
+            String name, boolean isBatch, boolean partitioned, boolean 
transaction, boolean keyed)
             throws Exception {
         StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : 
buildStreamEnv();
 
         // in eventual mode, failure will result in duplicate data
-        FileStore fileStore =
-                buildFileStore(
-                        buildConfiguration(isBatch || !transaction, 
TEMPORARY_FOLDER.newFolder()),
-                        partitioned);
+        TableStore store = buildTableStore(isBatch || !transaction, 
TEMPORARY_FOLDER);
+        if (partitioned) {
+            store.withPartitions(new int[] {1});
+        }
+
+        if (!keyed) {
+            store.withPrimaryKeys(new int[0]);
+        }
 
         // prepare log
         DynamicTableFactory.Context context =
@@ -90,8 +106,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                         transaction
                                 ? LogOptions.LogConsistency.TRANSACTIONAL
                                 : LogOptions.LogConsistency.EVENTUAL,
-                        FileStoreITCase.VALUE_TYPE,
-                        new int[] {2});
+                        TABLE_TYPE,
+                        keyed ? new int[] {2} : new int[0]);
 
         KafkaLogStoreFactory factory = discoverKafkaLogFactory();
         KafkaLogSinkProvider sinkProvider = 
factory.createSinkProvider(context, SINK_CONTEXT);
@@ -102,64 +118,44 @@ public class LogStoreSinkITCase extends 
KafkaTableTestBase {
 
         try {
             // write
-            DataStreamSource<RowData> finiteSource = buildTestSource(env, 
isBatch);
-            FileStoreITCase.write(finiteSource, fileStore, partitioned, null, 
sinkProvider);
+            store.sinkBuilder()
+                    .withInput(buildTestSource(env, isBatch))
+                    .withLogSinkProvider(sinkProvider)
+                    .build();
+            env.execute();
 
             // read
-            List<Row> results = FileStoreITCase.read(env, fileStore);
-
-            Row[] expected =
-                    partitioned
-                            ? new Row[] {
-                                Row.of(5, "p2", 1),
-                                Row.of(3, "p2", 5),
-                                Row.of(5, "p1", 1),
-                                Row.of(0, "p1", 2)
-                            }
-                            : new Row[] {
-                                Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)
-                            };
-            assertThat(results).containsExactlyInAnyOrder(expected);
+            List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
 
-            results =
-                    buildStreamEnv()
-                            .fromSource(
-                                    sourceProvider.createSource(null),
-                                    WatermarkStrategy.noWatermarks(),
-                                    "source")
-                            .executeAndCollect(isBatch ? 6 : 12).stream()
-                            .map(FileStoreITCase.CONVERTER::toExternal)
-                            .collect(Collectors.toList());
-
-            if (isBatch) {
+            Row[] expected;
+            if (keyed) {
                 expected =
-                        new Row[] {
-                            Row.of(0, "p1", 1),
-                            Row.of(0, "p1", 2),
-                            Row.of(5, "p1", 1),
-                            Row.of(6, "p2", 1),
-                            Row.of(3, "p2", 5),
-                            Row.of(5, "p2", 1)
-                        };
+                        partitioned
+                                ? new Row[] {
+                                    Row.of(5, "p2", 1),
+                                    Row.of(3, "p2", 5),
+                                    Row.of(5, "p1", 1),
+                                    Row.of(0, "p1", 2)
+                                }
+                                : new Row[] {
+                                    Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)
+                                };
             } else {
-                // read log
-                // expect origin data X 2 (FiniteTestSource)
-                expected =
-                        new Row[] {
-                            Row.of(0, "p1", 1),
-                            Row.of(0, "p1", 2),
-                            Row.of(5, "p1", 1),
-                            Row.of(6, "p2", 1),
-                            Row.of(3, "p2", 5),
-                            Row.of(5, "p2", 1),
-                            Row.of(0, "p1", 1),
-                            Row.of(0, "p1", 2),
-                            Row.of(5, "p1", 1),
-                            Row.of(6, "p2", 1),
-                            Row.of(3, "p2", 5),
-                            Row.of(5, "p2", 1)
-                        };
+                Stream<RowData> expectedStream =
+                        isBatch
+                                ? SOURCE_DATA.stream()
+                                : Stream.concat(SOURCE_DATA.stream(), 
SOURCE_DATA.stream());
+                expected = 
expectedStream.map(CONVERTER::toExternal).toArray(Row[]::new);
             }
+
+            assertThat(results).containsExactlyInAnyOrder(expected);
+
+            results =
+                    store.sourceBuilder().withContinuousMode(true)
+                            
.withLogSourceProvider(sourceProvider).build(buildStreamEnv())
+                            .executeAndCollect(expected.length).stream()
+                            .map(CONVERTER::toExternal)
+                            .collect(Collectors.toList());
             assertThat(results).containsExactlyInAnyOrder(expected);
         } finally {
             factory.onDropTable(context, true);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 879d908..5112244 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -38,7 +38,7 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void serializeEmptyCheckpoint() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                PendingSplitsCheckpoint.fromStatic(Collections.emptyList());
+                new PendingSplitsCheckpoint(Collections.emptyList(), 5);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
 
@@ -48,8 +48,8 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void serializeSomeSplits() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                PendingSplitsCheckpoint.fromStatic(
-                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()));
+                new PendingSplitsCheckpoint(
+                        Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 3);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
 
@@ -59,7 +59,7 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void serializeSplitsAndContinuous() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                PendingSplitsCheckpoint.fromContinuous(
+                new PendingSplitsCheckpoint(
                         Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()), 20);
 
         final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
@@ -70,7 +70,7 @@ public class PendingSplitsCheckpointSerializerTest {
     @Test
     public void repeatedSerialization() throws Exception {
         final PendingSplitsCheckpoint checkpoint =
-                PendingSplitsCheckpoint.fromStatic(Arrays.asList(testSplit3(), 
testSplit1()));
+                new PendingSplitsCheckpoint(Arrays.asList(testSplit3(), 
testSplit1()), 5);
 
         serializeAndDeserialize(checkpoint);
         serializeAndDeserialize(checkpoint);
@@ -112,6 +112,6 @@ public class PendingSplitsCheckpointSerializerTest {
     private static void assertCheckpointsEqual(
             final PendingSplitsCheckpoint expected, final 
PendingSplitsCheckpoint actual) {
         assertThat(actual.splits()).isEqualTo(expected.splits());
-        
assertThat(actual.nextSnapshotId()).isEqualTo(expected.nextSnapshotId());
+        
assertThat(actual.currentSnapshotId()).isEqualTo(expected.currentSnapshotId());
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
index 96c6bd6..7fe861d 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -102,7 +102,6 @@ public class StaticFileStoreSplitEnumeratorTest {
     private static StaticFileStoreSplitEnumerator createEnumerator(
             final SplitEnumeratorContext<FileStoreSourceSplit> context,
             final FileStoreSourceSplit... splits) {
-
-        return new StaticFileStoreSplitEnumerator(context, 
Arrays.asList(splits));
+        return new StaticFileStoreSplitEnumerator(context, null, 
Arrays.asList(splits));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index ab8e6ce..06dad34 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
@@ -37,6 +38,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
 
+    Snapshot snapshot(long snapshotId);
+
     FileStoreScan withPartitionFilter(Predicate predicate);
 
     FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index d99a792..b5d1370 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -75,6 +75,11 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     @Override
+    public Snapshot snapshot(long snapshotId) {
+        return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+    }
+
+    @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
         this.partitionFilter = predicate;
         return this;
@@ -154,8 +159,7 @@ public class FileStoreScanImpl implements FileStoreScan {
             if (snapshotId == null) {
                 manifests = Collections.emptyList();
             } else {
-                Snapshot snapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
-                manifests = snapshot.readAllManifests(manifestList);
+                manifests = 
snapshot(snapshotId).readAllManifests(manifestList);
             }
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 55c1bf8..0413535 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -49,12 +49,20 @@ public class SinkRecordConverter {
 
     public SinkRecord convert(RowData row) {
         BinaryRowData partition = partProjection.apply(row);
-        BinaryRowData key = keyProjection.apply(row);
-        int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
-        int bucket = Math.abs(hash % numBucket);
+        BinaryRowData key = key(row);
+        int bucket = bucket(row, key);
         return new SinkRecord(partition, bucket, key, row);
     }
 
+    public BinaryRowData key(RowData row) {
+        return keyProjection.apply(row);
+    }
+
+    public int bucket(RowData row, BinaryRowData key) {
+        int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
+        return Math.abs(hash % numBucket);
+    }
+
     private int hashRow(RowData row) {
         if (row instanceof BinaryRowData) {
             RowKind rowKind = row.getRowKind();

Reply via email to