This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 422c409b [FLINK-30211] Introduce CompactorSink for compact jobs in 
Table Store
422c409b is described below

commit 422c409b36efcd0e75fdde61b0601f405b18e3ef
Author: tsreaper <[email protected]>
AuthorDate: Wed Dec 14 14:55:05 2022 +0800

    [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store
    
    This closes #435.
---
 .../connector/sink/CommittableStateManager.java    |  37 +++++
 .../store/connector/sink/CommitterOperator.java    |  60 ++------
 .../table/store/connector/sink/CompactorSink.java  |  55 +++++++
 ...kSinkBuilder.java => CompactorSinkBuilder.java} |  43 ++----
 .../table/store/connector/sink/FileStoreSink.java  |  78 ++++++++++
 .../sink/{StoreSink.java => FlinkSink.java}        | 102 ++++++-------
 .../store/connector/sink/FlinkSinkBuilder.java     |   5 +-
 .../connector/sink/HashRowStreamPartitioner.java   |  71 +++++++++
 .../sink/NoopCommittableStateManager.java          |  46 ++++++
 .../RestoreAndFailCommittableStateManager.java     |  95 ++++++++++++
 .../store/connector/sink/StoreCompactOperator.java |   2 +-
 .../table/store/connector/FileStoreITCase.java     |   4 +-
 .../connector/sink/CommitterOperatorTest.java      | 159 ++++++++++++---------
 .../connector/sink/CommitterOperatorTestBase.java  | 105 ++++++++++++++
 .../store/connector/sink/CompactorSinkITCase.java  | 138 ++++++++++++++++++
 .../store/connector/sink/SinkSavepointITCase.java  |   2 +-
 .../connector/source/CompactorSourceITCase.java    |   2 +-
 .../table/store/table/system/BucketsTable.java     |  17 +--
 18 files changed, 791 insertions(+), 230 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
new file mode 100644
index 00000000..8ef8d5dc
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Helper interface for {@link CommitterOperator}. This interface manages 
operator states about
+ * {@link org.apache.flink.table.store.file.manifest.ManifestCommittable}.
+ */
+public interface CommittableStateManager extends Serializable {
+
+    void initializeState(StateInitializationContext context, Committer 
committer) throws Exception;
+
+    void snapshotState(StateSnapshotContext context, List<ManifestCommittable> 
committables)
+            throws Exception;
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index ac55befd..509adac5 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -17,21 +17,15 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.util.function.SerializableFunction;
-import org.apache.flink.util.function.SerializableSupplier;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -44,7 +38,7 @@ import java.util.TreeMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Committer operator to commit {@link Committable}. */
+/** Operator to commit {@link Committable}s for each snapshot. */
 public class CommitterOperator extends AbstractStreamOperator<Committable>
         implements OneInputStreamOperator<Committable, Committable>, 
BoundedOneInput {
 
@@ -69,22 +63,17 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
     private final String initialCommitUser;
 
     /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, ManifestCommittable> 
committablesPerCheckpoint;
-
-    /** The committable's serializer. */
-    private final 
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
-            committableSerializer;
-
-    /** ManifestCommittable state of this job. Used to filter out previous 
successful commits. */
-    private ListState<ManifestCommittable> streamingCommitterState;
+    protected final NavigableMap<Long, ManifestCommittable> 
committablesPerCheckpoint;
 
     private final SerializableFunction<String, Committer> committerFactory;
 
+    private final CommittableStateManager committableStateManager;
+
     /**
      * Aggregate committables to global committables and commit the global 
committables to the
      * external system.
      */
-    private Committer committer;
+    protected Committer committer;
 
     private boolean endInput = false;
 
@@ -92,13 +81,12 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
             boolean streamingCheckpointEnabled,
             String initialCommitUser,
             SerializableFunction<String, Committer> committerFactory,
-            
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
-                    committableSerializer) {
+            CommittableStateManager committableStateManager) {
         this.streamingCheckpointEnabled = streamingCheckpointEnabled;
         this.initialCommitUser = initialCommitUser;
-        this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
         this.committerFactory = checkNotNull(committerFactory);
+        this.committableStateManager = committableStateManager;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
@@ -115,35 +103,7 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
         // parallelism of commit operator is always 1, so commitUser will 
never be null
         committer = committerFactory.apply(commitUser);
 
-        streamingCommitterState =
-                new SimpleVersionedListState<>(
-                        context.getOperatorStateStore()
-                                .getListState(
-                                        new ListStateDescriptor<>(
-                                                
"streaming_committer_raw_states",
-                                                
BytePrimitiveArraySerializer.INSTANCE)),
-                        committableSerializer.get());
-        List<ManifestCommittable> restored = new ArrayList<>();
-        streamingCommitterState.get().forEach(restored::add);
-        streamingCommitterState.clear();
-        commit(true, restored);
-    }
-
-    private void commit(boolean isRecover, List<ManifestCommittable> 
committables)
-            throws Exception {
-        if (isRecover) {
-            committables = committer.filterRecoveredCommittables(committables);
-            if (!committables.isEmpty()) {
-                committer.commit(committables);
-                throw new RuntimeException(
-                        "This exception is intentionally thrown "
-                                + "after committing the restored checkpoints. "
-                                + "By restarting the job we hope that "
-                                + "writers can start writing based on these 
new commits.");
-            }
-        } else {
-            committer.commit(committables);
-        }
+        committableStateManager.initializeState(context, committer);
     }
 
     private ManifestCommittable toCommittables(long checkpoint, 
List<Committable> inputs)
@@ -155,7 +115,7 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         pollInputs();
-        
streamingCommitterState.update(committables(committablesPerCheckpoint));
+        committableStateManager.snapshotState(context, 
committables(committablesPerCheckpoint));
     }
 
     private List<ManifestCommittable> committables(NavigableMap<Long, 
ManifestCommittable> map) {
@@ -182,7 +142,7 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
         NavigableMap<Long, ManifestCommittable> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        commit(false, committables(headMap));
+        committer.commit(committables(headMap));
         headMap.clear();
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
new file mode 100644
index 00000000..4de06f34
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.util.function.SerializableFunction;
+
+/** {@link FlinkSink} for stand-alone compact jobs. */
+public class CompactorSink extends FlinkSink {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Lock.Factory lockFactory;
+
+    public CompactorSink(FileStoreTable table, Lock.Factory lockFactory) {
+        super(table, false);
+        this.lockFactory = lockFactory;
+    }
+
+    @Override
+    protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
+        return new StoreCompactOperator(table, writeProvider, isStreaming);
+    }
+
+    @Override
+    protected SerializableFunction<String, Committer> createCommitterFactory(
+            boolean streamingCheckpointEnabled) {
+        return user -> new 
StoreCommitter(table.newCommit(user).withLock(lockFactory.create()));
+    }
+
+    @Override
+    protected CommittableStateManager createCommittableStateManager() {
+        return new NoopCommittableStateManager();
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
similarity index 59%
copy from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
copy to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
index 9107e19d..aee210ae 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
@@ -25,62 +25,39 @@ import 
org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.system.BucketsTable;
 
-import javax.annotation.Nullable;
-
-import java.util.Map;
-
-/** Sink builder to build a flink sink from input. */
-public class FlinkSinkBuilder {
+/** Builder for {@link CompactorSink}. */
+public class CompactorSinkBuilder {
 
     private final FileStoreTable table;
 
     private DataStream<RowData> input;
     private Lock.Factory lockFactory = Lock.emptyFactory();
-    @Nullable private Map<String, String> overwritePartition;
-    @Nullable private LogSinkFunction logSinkFunction;
-    @Nullable private Integer parallelism;
 
-    public FlinkSinkBuilder(FileStoreTable table) {
+    public CompactorSinkBuilder(FileStoreTable table) {
         this.table = table;
     }
 
-    public FlinkSinkBuilder withInput(DataStream<RowData> input) {
+    public CompactorSinkBuilder withInput(DataStream<RowData> input) {
         this.input = input;
         return this;
     }
 
-    public FlinkSinkBuilder withLockFactory(Lock.Factory lockFactory) {
+    public CompactorSinkBuilder withLockFactory(Lock.Factory lockFactory) {
         this.lockFactory = lockFactory;
         return this;
     }
 
-    public FlinkSinkBuilder withOverwritePartition(Map<String, String> 
overwritePartition) {
-        this.overwritePartition = overwritePartition;
-        return this;
-    }
-
-    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction 
logSinkFunction) {
-        this.logSinkFunction = logSinkFunction;
-        return this;
-    }
-
-    public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
-        this.parallelism = parallelism;
-        return this;
-    }
-
     public DataStreamSink<?> build() {
-        BucketStreamPartitioner partitioner = new 
BucketStreamPartitioner(table.schema());
+        HashRowStreamPartitioner partitioner =
+                new HashRowStreamPartitioner(
+                        
BucketsTable.rowType(table.schema().logicalPartitionType()));
         PartitionTransformation<RowData> partitioned =
                 new PartitionTransformation<>(input.getTransformation(), 
partitioner);
-        if (parallelism != null) {
-            partitioned.setParallelism(parallelism);
-        }
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        StoreSink sink = new StoreSink(table, lockFactory, overwritePartition, 
logSinkFunction);
+        CompactorSink sink = new CompactorSink(table, lockFactory);
         return sink.sinkFrom(new DataStream<>(env, partitioned));
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
new file mode 100644
index 00000000..0151f18d
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.util.function.SerializableFunction;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** {@link FlinkSink} for writing records into table store. */
+public class FileStoreSink extends FlinkSink {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Lock.Factory lockFactory;
+    @Nullable private final Map<String, String> overwritePartition;
+    @Nullable private final LogSinkFunction logSinkFunction;
+
+    public FileStoreSink(
+            FileStoreTable table,
+            Lock.Factory lockFactory,
+            @Nullable Map<String, String> overwritePartition,
+            @Nullable LogSinkFunction logSinkFunction) {
+        super(table, overwritePartition != null);
+        this.lockFactory = lockFactory;
+        this.overwritePartition = overwritePartition;
+        this.logSinkFunction = logSinkFunction;
+    }
+
+    @Override
+    protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
+        return new StoreWriteOperator(table, logSinkFunction, writeProvider);
+    }
+
+    @Override
+    protected SerializableFunction<String, Committer> createCommitterFactory(
+            boolean streamingCheckpointEnabled) {
+        // If checkpoint is enabled for streaming job, we have to
+        // commit new files list even if they're empty.
+        // Otherwise we can't tell if the commit is successful after
+        // a restart.
+        return user ->
+                new StoreCommitter(
+                        table.newCommit(user)
+                                .withOverwritePartition(overwritePartition)
+                                
.withCreateEmptyCommit(streamingCheckpointEnabled)
+                                .withLock(lockFactory.create()));
+    }
+
+    @Override
+    protected CommittableStateManager createCommittableStateManager() {
+        return new 
RestoreAndFailCommittableStateManager(ManifestCommittableSerializer::new);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
similarity index 62%
rename from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
rename to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index 0823e6a0..3e9ee855 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -33,75 +33,46 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import 
org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
-import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
+import org.apache.flink.util.function.SerializableFunction;
 
 import java.io.Serializable;
-import java.util.Map;
 import java.util.UUID;
 
-/** Sink of dynamic store. */
-public class StoreSink implements Serializable {
+/** Abstract sink of table store. */
+public abstract class FlinkSink implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     private static final String WRITER_NAME = "Writer";
-
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
-    private final FileStoreTable table;
-    private final Lock.Factory lockFactory;
-    @Nullable private final Map<String, String> overwritePartition;
-    @Nullable private final LogSinkFunction logSinkFunction;
+    protected final FileStoreTable table;
+    private final boolean isOverwrite;
 
-    public StoreSink(
-            FileStoreTable table,
-            Lock.Factory lockFactory,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction) {
+    public FlinkSink(FileStoreTable table, boolean isOverwrite) {
         this.table = table;
-        this.lockFactory = lockFactory;
-        this.overwritePartition = overwritePartition;
-        this.logSinkFunction = logSinkFunction;
+        this.isOverwrite = isOverwrite;
     }
 
-    private OneInputStreamOperator<RowData, Committable> createWriteOperator(
-            String initialCommitUser) {
-        boolean isOverwrite = overwritePartition != null;
-        StoreSinkWrite.Provider writeProvider;
+    protected StoreSinkWrite.Provider createWriteProvider(String 
initialCommitUser) {
         if (table.options().changelogProducer() == 
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
             long fullCompactionThresholdMs =
                     
table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
-            writeProvider =
-                    (table, context, ioManager) ->
-                            new FullChangelogStoreSinkWrite(
-                                    table,
-                                    context,
-                                    initialCommitUser,
-                                    ioManager,
-                                    isOverwrite,
-                                    fullCompactionThresholdMs);
+            return (table, context, ioManager) ->
+                    new FullChangelogStoreSinkWrite(
+                            table,
+                            context,
+                            initialCommitUser,
+                            ioManager,
+                            isOverwrite,
+                            fullCompactionThresholdMs);
         } else {
-            writeProvider =
-                    (table, context, ioManager) ->
-                            new StoreSinkWriteImpl(
-                                    table, context, initialCommitUser, 
ioManager, isOverwrite);
+            return (table, context, ioManager) ->
+                    new StoreSinkWriteImpl(
+                            table, context, initialCommitUser, ioManager, 
isOverwrite);
         }
-
-        return new StoreWriteOperator(table, logSinkFunction, writeProvider);
-    }
-
-    private StoreCommitter createCommitter(String user, boolean 
createEmptyCommit) {
-        return new StoreCommitter(
-                table.newCommit(user)
-                        .withOverwritePartition(overwritePartition)
-                        .withCreateEmptyCommit(createEmptyCommit)
-                        .withLock(lockFactory.create()));
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
@@ -116,18 +87,23 @@ public class StoreSink implements Serializable {
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
 
-        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
-        SingleOutputStreamOperator<Committable> written =
-                input.transform(WRITER_NAME, typeInfo, 
createWriteOperator(initialCommitUser))
-                        .setParallelism(input.getParallelism());
-
+        boolean isStreaming =
+                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         boolean streamingCheckpointEnabled =
-                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING
-                        && checkpointConfig.isCheckpointingEnabled();
+                isStreaming && checkpointConfig.isCheckpointingEnabled();
         if (streamingCheckpointEnabled) {
             assertCheckpointConfiguration(env);
         }
 
+        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+        SingleOutputStreamOperator<Committable> written =
+                input.transform(
+                                WRITER_NAME,
+                                typeInfo,
+                                createWriteOperator(
+                                        
createWriteProvider(initialCommitUser), isStreaming))
+                        .setParallelism(input.getParallelism());
+
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME,
@@ -135,12 +111,8 @@ public class StoreSink implements Serializable {
                                 new CommitterOperator(
                                         streamingCheckpointEnabled,
                                         initialCommitUser,
-                                        // If checkpoint is enabled for 
streaming job, we have to
-                                        // commit new files list even if 
they're empty.
-                                        // Otherwise we can't tell if the 
commit is successful after
-                                        // a restart.
-                                        user -> createCommitter(user, 
streamingCheckpointEnabled),
-                                        ManifestCommittableSerializer::new))
+                                        
createCommitterFactory(streamingCheckpointEnabled),
+                                        createCommittableStateManager()))
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
@@ -158,4 +130,12 @@ public class StoreSink implements Serializable {
                         + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
                         + " to exactly-once");
     }
+
+    protected abstract OneInputStreamOperator<RowData, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming);
+
+    protected abstract SerializableFunction<String, Committer> 
createCommitterFactory(
+            boolean streamingCheckpointEnabled);
+
+    protected abstract CommittableStateManager createCommittableStateManager();
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 9107e19d..d93e1577 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-/** Sink builder to build a flink sink from input. */
+/** Builder for {@link FileStoreSink}. */
 public class FlinkSinkBuilder {
 
     private final FileStoreTable table;
@@ -80,7 +80,8 @@ public class FlinkSinkBuilder {
         }
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        StoreSink sink = new StoreSink(table, lockFactory, overwritePartition, 
logSinkFunction);
+        FileStoreSink sink =
+                new FileStoreSink(table, lockFactory, overwritePartition, 
logSinkFunction);
         return sink.sinkFrom(new DataStream<>(env, partitioned));
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
new file mode 100644
index 00000000..f69443a0
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link StreamPartitioner} to partition {@link RowData} according to its 
hash value. */
+public class HashRowStreamPartitioner extends StreamPartitioner<RowData> {
+
+    private final RowType rowType;
+
+    private transient RowDataSerializer serializer;
+
+    public HashRowStreamPartitioner(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public void setup(int numberOfChannels) {
+        super.setup(numberOfChannels);
+        serializer = new RowDataSerializer(rowType);
+    }
+
+    @Override
+    public int selectChannel(SerializationDelegate<StreamRecord<RowData>> 
record) {
+        int hash = 
serializer.toBinaryRow(record.getInstance().getValue()).hashCode();
+        return Math.abs(hash) % numberOfChannels;
+    }
+
+    @Override
+    public StreamPartitioner<RowData> copy() {
+        return this;
+    }
+
+    @Override
+    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+        return SubtaskStateMapper.FULL;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "compactor-stream-partitioner";
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
new file mode 100644
index 00000000..2f45f998
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/**
+ * A {@link CommittableStateManager} which does nothing. If a commit attempt 
fails, it will be lost
+ * after the job restarts.
+ *
+ * <p>Useful for committing optional snapshots. For example COMPACT snapshots 
produced by a separate
+ * compact job.
+ */
+public class NoopCommittableStateManager implements CommittableStateManager {
+
+    @Override
+    public void initializeState(StateInitializationContext context, Committer 
committer)
+            throws Exception {
+        // nothing to do
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context, 
List<ManifestCommittable> committables)
+            throws Exception {
+        // nothing to do
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
new file mode 100644
index 00000000..524ad999
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link CommittableStateManager} which stores uncommitted {@link 
ManifestCommittable}s in state.
+ *
+ * <p>When the job restarts, these {@link ManifestCommittable}s will be 
restored and committed, then
+ * an intended failure will occur, hoping that after the job restarts, all 
writers can start writing
+ * based on the restored snapshot.
+ *
+ * <p>Useful for committing snapshots containing records. For example 
snapshots produced by table
+ * store writers.
+ */
+public class RestoreAndFailCommittableStateManager implements 
CommittableStateManager {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The committable's serializer. */
+    private final 
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+            committableSerializer;
+
+    /** ManifestCommittable state of this job. Used to filter out previous 
successful commits. */
+    private ListState<ManifestCommittable> streamingCommitterState;
+
+    public RestoreAndFailCommittableStateManager(
+            
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+                    committableSerializer) {
+        this.committableSerializer = committableSerializer;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context, Committer 
committer)
+            throws Exception {
+        streamingCommitterState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(
+                                        new ListStateDescriptor<>(
+                                                
"streaming_committer_raw_states",
+                                                
BytePrimitiveArraySerializer.INSTANCE)),
+                        committableSerializer.get());
+        List<ManifestCommittable> restored = new ArrayList<>();
+        streamingCommitterState.get().forEach(restored::add);
+        streamingCommitterState.clear();
+        recover(restored, committer);
+    }
+
+    private void recover(List<ManifestCommittable> committables, Committer 
committer)
+            throws Exception {
+        committables = committer.filterRecoveredCommittables(committables);
+        if (!committables.isEmpty()) {
+            committer.commit(committables);
+            throw new RuntimeException(
+                    "This exception is intentionally thrown "
+                            + "after committing the restored checkpoints. "
+                            + "By restarting the job we hope that "
+                            + "writers can start writing based on these new 
commits.");
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context, 
List<ManifestCommittable> committables)
+            throws Exception {
+        streamingCommitterState.update(committables);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 2cdfad51..9ac2957a 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -55,7 +55,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator {
         Preconditions.checkArgument(
                 !table.options().writeCompactionSkip(),
                 CoreOptions.WRITE_COMPACTION_SKIP.key()
-                        + " should not be true for StoreCompactOperator. This 
is unexpected.");
+                        + " should not be true for StoreCompactOperator.");
         this.table = table;
         this.storeSinkWriteProvider = storeSinkWriteProvider;
         this.isStreaming = isStreaming;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index b4e8bb72..86d5dab0 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -33,8 +33,8 @@ import 
org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.FileStoreSink;
 import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
-import org.apache.flink.table.store.connector.sink.StoreSink;
 import org.apache.flink.table.store.connector.source.ContinuousFileStoreSource;
 import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
 import org.apache.flink.table.store.connector.source.StaticFileStoreSource;
@@ -80,7 +80,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
- * StoreSink}.
+ * FileStoreSink}.
  */
 @RunWith(Parameterized.class)
 public class FileStoreITCase extends AbstractTestBase {
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
index f176132d..075dd1d6 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -20,68 +20,44 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
 import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.CloseableIterator;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for {@link CommitterOperator}. */
-public class CommitterOperatorTest {
+public class CommitterOperatorTest extends CommitterOperatorTestBase {
 
-    private static final RowType ROW_TYPE =
-            RowType.of(
-                    new LogicalType[] {
-                        DataTypes.INT().getLogicalType(), 
DataTypes.BIGINT().getLogicalType()
-                    },
-                    new String[] {"a", "b"});
-
-    @TempDir public java.nio.file.Path tempDir;
-    private Path tablePath;
     private String initialCommitUser;
 
     @BeforeEach
     public void before() {
-        tablePath = new Path(tempDir.toString());
+        super.before();
         initialCommitUser = UUID.randomUUID().toString();
     }
 
+    // ------------------------------------------------------------------------
+    //  Recoverable operator tests
+    // ------------------------------------------------------------------------
+
     @Test
     public void testFailIntentionallyAfterRestore() throws Exception {
         FileStoreTable table = createFileStoreTable();
 
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
-                createTestHarness(table);
+                createRecoverableTestHarness(table);
         testHarness.open();
 
         TableWrite write = table.newWrite(initialCommitUser);
@@ -97,7 +73,7 @@ public class CommitterOperatorTest {
         OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
         assertThat(table.snapshotManager().latestSnapshotId()).isNull();
 
-        testHarness = createTestHarness(table);
+        testHarness = createRecoverableTestHarness(table);
         try {
             // commit snapshot from state, fail intentionally
             testHarness.initializeState(snapshot);
@@ -114,7 +90,7 @@ public class CommitterOperatorTest {
         assertResults(table, "1, 10", "2, 20");
 
         // snapshot is successfully committed, no failure is needed
-        testHarness = createTestHarness(table);
+        testHarness = createRecoverableTestHarness(table);
         testHarness.initializeState(snapshot);
         testHarness.open();
         assertResults(table, "1, 10", "2, 20");
@@ -124,7 +100,7 @@ public class CommitterOperatorTest {
     public void testCheckpointAbort() throws Exception {
         FileStoreTable table = createFileStoreTable();
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
-                createTestHarness(table);
+                createRecoverableTestHarness(table);
         testHarness.open();
 
         // files from multiple checkpoint
@@ -151,53 +127,94 @@ public class CommitterOperatorTest {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
     }
 
-    private void assertResults(FileStoreTable table, String... expected) {
-        TableRead read = table.newRead();
-        List<String> actual = new ArrayList<>();
-        table.newScan()
-                .plan()
-                .splits
-                .forEach(
-                        s -> {
-                            try {
-                                RecordReader<RowData> recordReader = 
read.createReader(s);
-                                CloseableIterator<RowData> it =
-                                        new 
RecordReaderIterator<>(recordReader);
-                                while (it.hasNext()) {
-                                    RowData row = it.next();
-                                    actual.add(row.getInt(0) + ", " + 
row.getLong(1));
-                                }
-                                it.close();
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-        Collections.sort(actual);
-        assertThat(actual).isEqualTo(Arrays.asList(expected));
+    // ------------------------------------------------------------------------
+    //  Lossy operator tests
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testSnapshotLostWhenFailed() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createLossyTestHarness(table);
+        testHarness.open();
+
+        long timestamp = 1;
+
+        // this checkpoint is notified, should be committed
+        TableWrite write = table.newWrite(initialCommitUser);
+        write.write(GenericRowData.of(1, 10L));
+        write.write(GenericRowData.of(2, 20L));
+        for (FileCommittable committable : write.prepareCommit(false, 1)) {
+            testHarness.processElement(
+                    new Committable(1, Committable.Kind.FILE, committable), 
timestamp++);
+        }
+        testHarness.snapshot(1, timestamp++);
+        testHarness.notifyOfCompletedCheckpoint(1);
+
+        // this checkpoint is not notified, should not be committed
+        write.write(GenericRowData.of(3, 30L));
+        write.write(GenericRowData.of(4, 40L));
+        for (FileCommittable committable : write.prepareCommit(false, 2)) {
+            testHarness.processElement(
+                    new Committable(2, Committable.Kind.FILE, committable), 
timestamp++);
+        }
+        OperatorSubtaskState snapshot = testHarness.snapshot(2, timestamp++);
+
+        // reopen test harness
+        write.close();
+        testHarness.close();
+
+        testHarness = createLossyTestHarness(table);
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        // this checkpoint is notified, should be committed
+        write = table.newWrite(initialCommitUser);
+        write.write(GenericRowData.of(5, 50L));
+        write.write(GenericRowData.of(6, 60L));
+        for (FileCommittable committable : write.prepareCommit(false, 3)) {
+            testHarness.processElement(
+                    new Committable(3, Committable.Kind.FILE, committable), 
timestamp++);
+        }
+        testHarness.snapshot(3, timestamp++);
+        testHarness.notifyOfCompletedCheckpoint(3);
+
+        write.close();
+        testHarness.close();
+
+        assertResults(table, "1, 10", "2, 20", "5, 50", "6, 60");
     }
 
-    private FileStoreTable createFileStoreTable() throws Exception {
-        Configuration conf = new Configuration();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        SchemaManager schemaManager = new SchemaManager(tablePath);
-        schemaManager.commitNewVersion(
-                new UpdateSchema(
-                        ROW_TYPE,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        conf.toMap(),
-                        ""));
-        return FileStoreTableFactory.create(conf);
+    // ------------------------------------------------------------------------
+    //  Test utils
+    // ------------------------------------------------------------------------
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable>
+            createRecoverableTestHarness(FileStoreTable table) throws 
Exception {
+        CommitterOperator operator =
+                new CommitterOperator(
+                        true,
+                        initialCommitUser,
+                        user -> new StoreCommitter(table.newCommit(user)),
+                        new RestoreAndFailCommittableStateManager(
+                                ManifestCommittableSerializer::new));
+        return createTestHarness(operator);
     }
 
-    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
+    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createLossyTestHarness(
             FileStoreTable table) throws Exception {
         CommitterOperator operator =
                 new CommitterOperator(
                         true,
                         initialCommitUser,
                         user -> new StoreCommitter(table.newCommit(user)),
-                        ManifestCommittableSerializer::new);
+                        new NoopCommittableStateManager());
+        return createTestHarness(operator);
+    }
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
+            CommitterOperator operator) throws Exception {
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
         OneInputStreamOperatorTestHarness<Committable, Committable> harness =
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
new file mode 100644
index 00000000..01ed9563
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base test class for {@link AtMostOnceCommitterOperator} and {@link 
ExactlyOnceCommitOperator}.
+ */
+public abstract class CommitterOperatorTestBase {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(), 
DataTypes.BIGINT().getLogicalType()
+                    },
+                    new String[] {"a", "b"});
+
+    @TempDir public java.nio.file.Path tempDir;
+    protected Path tablePath;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(tempDir.toString());
+    }
+
+    protected void assertResults(FileStoreTable table, String... expected) {
+        TableRead read = table.newRead();
+        List<String> actual = new ArrayList<>();
+        table.newScan()
+                .plan()
+                .splits
+                .forEach(
+                        s -> {
+                            try {
+                                RecordReader<RowData> recordReader = 
read.createReader(s);
+                                CloseableIterator<RowData> it =
+                                        new 
RecordReaderIterator<>(recordReader);
+                                while (it.hasNext()) {
+                                    RowData row = it.next();
+                                    actual.add(row.getInt(0) + ", " + 
row.getLong(1));
+                                }
+                                it.close();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        Collections.sort(actual);
+        assertThat(actual).isEqualTo(Arrays.asList(expected));
+    }
+
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        ROW_TYPE,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        conf.toMap(),
+                        ""));
+        return FileStoreTableFactory.create(conf);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
new file mode 100644
index 00000000..774074cb
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+/** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
+public class CompactorSinkITCase extends AbstractTestBase {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType()
+                    },
+                    new String[] {"dt", "hh", "k", "v"});
+
+    private Path tablePath;
+    private String commitUser;
+
+    @Before
+    public void before() throws IOException {
+        tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(20221208, 15, 1, 100));
+        write.write(rowData(20221208, 16, 1, 100));
+        write.write(rowData(20221209, 15, 1, 100));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(20221208, 15, 2, 200));
+        write.write(rowData(20221208, 16, 2, 200));
+        write.write(rowData(20221209, 15, 2, 200));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assert.assertEquals(2, snapshot.id());
+        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+        write.close();
+        commit.close();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        DataStreamSource<RowData> source =
+                env.fromElements(rowData(20221208, 15, 0), rowData(20221209, 
15, 0));
+        new CompactorSinkBuilder(table).withInput(source).build();
+        env.execute();
+
+        snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assert.assertEquals(3, snapshot.id());
+        Assert.assertEquals(Snapshot.CommitKind.COMPACT, 
snapshot.commitKind());
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        Assert.assertEquals(3, plan.splits().size());
+        for (DataSplit split : plan.splits) {
+            if (split.partition().getInt(1) == 15) {
+                // compacted
+                Assert.assertEquals(1, split.files().size());
+            } else {
+                // not compacted
+                Assert.assertEquals(2, split.files().size());
+            }
+        }
+    }
+
+    private GenericRowData rowData(Object... values) {
+        return GenericRowData.of(values);
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema tableSchema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                Collections.emptyMap(),
+                                ""));
+        return FileStoreTableFactory.create(tablePath, tableSchema);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
index 1aa4546f..bb94a6ee 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -53,7 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-/** IT cases for {@link StoreSink} when writing file store and with 
savepoints. */
+/** IT cases for {@link FileStoreSink} when writing file store and with 
savepoints. */
 public class SinkSavepointITCase extends AbstractTestBase {
 
     private String path;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
index ed68276c..f7ebba03 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
@@ -52,7 +52,7 @@ import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link CompactorSourceBuilder}. */
+/** IT cases for {@link CompactorSourceBuilder}. */
 public class CompactorSourceITCase extends AbstractTestBase {
 
     private static final RowType ROW_TYPE =
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
index 8f6a22fb..b94c3c32 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
@@ -53,16 +53,9 @@ public class BucketsTable implements DataTable {
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable wrapped;
-    private final RowType rowType;
 
     public BucketsTable(FileStoreTable wrapped) {
         this.wrapped = wrapped;
-
-        RowType partitionType = wrapped.schema().logicalPartitionType();
-        List<RowType.RowField> fields = new 
ArrayList<>(partitionType.getFields());
-        // same with ManifestEntry.schema
-        fields.add(new RowType.RowField("_BUCKET", new IntType()));
-        this.rowType = new RowType(fields);
     }
 
     @Override
@@ -82,7 +75,15 @@ public class BucketsTable implements DataTable {
 
     @Override
     public RowType rowType() {
-        return rowType;
+        RowType partitionType = wrapped.schema().logicalPartitionType();
+        return rowType(partitionType);
+    }
+
+    public static RowType rowType(RowType partitionType) {
+        List<RowType.RowField> fields = new 
ArrayList<>(partitionType.getFields());
+        // same with ManifestEntry.schema
+        fields.add(new RowType.RowField("_BUCKET", new IntType()));
+        return new RowType(fields);
     }
 
     @Override

Reply via email to