This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ebb1d9c6b [flink] Use union list state to correctly deal with sink 
parallelism changes (#868)
ebb1d9c6b is described below

commit ebb1d9c6b581087616ff6c8a9e4e5e49f21731d1
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 11 15:51:23 2023 +0800

    [flink] Use union list state to correctly deal with sink parallelism 
changes (#868)
---
 docs/content/how-to/writing-tables.md              |   3 +-
 .../generated/flink_connector_configuration.html   |   6 -
 .../apache/paimon/flink/FlinkConnectorOptions.java |   8 +-
 .../paimon/flink/sink/AbstractChannelComputer.java |  71 ------
 .../flink/sink/AbstractStoreWriteOperator.java     | 236 -----------------
 .../flink/sink/BucketingStreamPartitioner.java     |  12 +-
 ...taChannelComputer.java => ChannelComputer.java} |  22 +-
 .../apache/paimon/flink/sink/CompactorSink.java    |   4 +-
 .../apache/paimon/flink/sink/FileStoreSink.java    |   4 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  53 ++--
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  37 +--
 .../flink/sink/FullChangelogStoreSinkWrite.java    | 281 ---------------------
 .../flink/sink/GlobalFullCompactionSinkWrite.java  |  84 +++---
 .../paimon/flink/sink/RowDataChannelComputer.java  |  46 +++-
 .../flink/sink/RowDataStoreWriteOperator.java      | 227 ++++++++++++++++-
 .../org/apache/paimon/flink/sink/StateUtils.java   |  12 +-
 .../paimon/flink/sink/StoreCompactOperator.java    |  42 ++-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |  12 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  42 +--
 .../paimon/flink/sink/StoreSinkWriteState.java     | 148 +++++++++++
 .../flink/sink/cdc/CdcRecordChannelComputer.java   |  39 ++-
 .../sink/cdc/CdcRecordStoreWriteOperator.java      | 143 +++++++++++
 .../apache/paimon/flink/sink/cdc/FlinkCdcSink.java |  14 +-
 .../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java |  48 +---
 .../sink/cdc/SchemaAwareStoreWriteOperator.java    |  89 -------
 .../flink/sink/FileStoreShuffleBucketTest.java     | 169 -------------
 .../flink/sink/RowDataChannelComputerTest.java     | 171 +++++++++++++
 .../paimon/flink/sink/SinkSavepointITCase.java     | 155 ++++++++----
 .../sink/cdc/CdcRecordChannelComputerTest.java     | 161 ++++++++++++
 ...t.java => CdcRecordStoreWriteOperatorTest.java} |  16 +-
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  |   2 -
 31 files changed, 1184 insertions(+), 1173 deletions(-)

diff --git a/docs/content/how-to/writing-tables.md 
b/docs/content/how-to/writing-tables.md
index 006635938..0b408ed5c 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -98,8 +98,7 @@ Use `INSERT INTO` to apply records and changes to tables.
 INSERT INTO MyTable SELECT ...
 ```
 
-Paimon supports shuffle data by bucket in sink phase. To improve data skew, 
Paimon also
-supports shuffling data by partition fields. You can add option 
`sink.partition-shuffle` to the table.
+Paimon supports shuffle data by partition and bucket in sink phase.
 
 {{< /tab >}}
 
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6d48d12f5..8a5951dab 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -74,12 +74,6 @@
             <td>Integer</td>
             <td>Defines a custom parallelism for the sink. By default, if this 
option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration.</td>
         </tr>
-        <tr>
-            <td><h5>sink.partition-shuffle</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>The option to enable shuffle data by dynamic partition fields 
in sink phase for paimon.</td>
-        </tr>
         <tr>
             <td><h5>streaming-read-atomic</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a1302cdfd..f6b0f1110 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -70,13 +70,6 @@ public class FlinkConnectorOptions {
                                     + "By default, if this option is not 
defined, the planner will derive the parallelism "
                                     + "for each statement individually by also 
considering the global configuration.");
 
-    public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION =
-            ConfigOptions.key("sink.partition-shuffle")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "The option to enable shuffle data by dynamic 
partition fields in sink phase for paimon.");
-
     public static final ConfigOption<Integer> SCAN_PARALLELISM =
             ConfigOptions.key("scan.parallelism")
                     .intType()
@@ -86,6 +79,7 @@ public class FlinkConnectorOptions {
                                     + "By default, if this option is not 
defined, the planner will derive the parallelism "
                                     + "for each statement individually by also 
considering the global configuration. "
                                     + "If user enable the 
scan.infer-parallelism, the planner will derive the parallelism by inferred 
parallelism.");
+
     public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
             ConfigOptions.key("scan.infer-parallelism")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
deleted file mode 100644
index bb5d364ad..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A utility class to compute which downstream channel a given record should 
be sent to.
- *
- * @param <T> type of record
- */
-public abstract class AbstractChannelComputer<T> {
-
-    private final int numChannels;
-    private final KeyAndBucketExtractor<T> extractor;
-    protected final boolean shuffleByPartitionEnable;
-
-    public AbstractChannelComputer(
-            int numChannels, KeyAndBucketExtractor<T> extractor, boolean 
shuffleByPartitionEnable) {
-        this.numChannels = numChannels;
-        this.extractor = extractor;
-        this.shuffleByPartitionEnable = shuffleByPartitionEnable;
-    }
-
-    public abstract int channel(T record);
-
-    protected int channelImpl(T record, Object... otherChannelKeys) {
-        extractor.setRecord(record);
-        int bucket = extractor.bucket();
-        int otherChannelKeysHash = Objects.hash(otherChannelKeys);
-
-        if (shuffleByPartitionEnable) {
-            BinaryRow partition = extractor.partition();
-            return Math.abs(Objects.hash(bucket, partition, 
otherChannelKeysHash)) % numChannels;
-        } else {
-            return Math.abs(Objects.hash(bucket, otherChannelKeysHash)) % 
numChannels;
-        }
-    }
-
-    /**
-     * Provider of {@link AbstractChannelComputer}.
-     *
-     * @param <T> type of record
-     */
-    public interface Provider<T> extends Serializable {
-
-        AbstractChannelComputer<T> provide(int numChannels);
-
-        String toString();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
deleted file mode 100644
index 9f9d34a4e..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.flink.log.LogWriteCallback;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.List;
-
-/** A {@link PrepareCommitOperator} to write records. */
-public abstract class AbstractStoreWriteOperator<T> extends 
PrepareCommitOperator<T> {
-
-    private static final long serialVersionUID = 2L;
-
-    protected FileStoreTable table;
-
-    @Nullable private final LogSinkFunction logSinkFunction;
-
-    private final StoreSinkWrite.Provider storeSinkWriteProvider;
-
-    protected transient StoreSinkWrite write;
-
-    private transient SimpleContext sinkContext;
-
-    /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
-    private long currentWatermark = Long.MIN_VALUE;
-
-    @Nullable private transient LogWriteCallback logCallback;
-
-    public AbstractStoreWriteOperator(
-            FileStoreTable table,
-            @Nullable LogSinkFunction logSinkFunction,
-            StoreSinkWrite.Provider storeSinkWriteProvider) {
-        this.table = table;
-        this.logSinkFunction = logSinkFunction;
-        this.storeSinkWriteProvider = storeSinkWriteProvider;
-    }
-
-    @Override
-    public void setup(
-            StreamTask<?, ?> containingTask,
-            StreamConfig config,
-            Output<StreamRecord<Committable>> output) {
-        super.setup(containingTask, config, output);
-        if (logSinkFunction != null) {
-            FunctionUtils.setFunctionRuntimeContext(logSinkFunction, 
getRuntimeContext());
-        }
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws 
Exception {
-        super.initializeState(context);
-        write =
-                storeSinkWriteProvider.provide(
-                        table, context, 
getContainingTask().getEnvironment().getIOManager());
-        if (logSinkFunction != null) {
-            StreamingFunctionUtils.restoreFunctionState(context, 
logSinkFunction);
-        }
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-
-        this.sinkContext = new SimpleContext(getProcessingTimeService());
-        if (logSinkFunction != null) {
-            FunctionUtils.openFunction(logSinkFunction, new Configuration());
-            logCallback = new LogWriteCallback();
-            logSinkFunction.setWriteCallback(logCallback);
-        }
-    }
-
-    @Override
-    public void processWatermark(Watermark mark) throws Exception {
-        super.processWatermark(mark);
-
-        this.currentWatermark = mark.getTimestamp();
-        if (logSinkFunction != null) {
-            logSinkFunction.writeWatermark(
-                    new 
org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
-        }
-    }
-
-    @Override
-    public void processElement(StreamRecord<T> element) throws Exception {
-        sinkContext.timestamp = element.hasTimestamp() ? 
element.getTimestamp() : null;
-
-        SinkRecord record = processRecord(element.getValue());
-
-        if (logSinkFunction != null) {
-            // write to log store, need to preserve original pk (which 
includes partition fields)
-            SinkRecord logRecord = write.toLogRecord(record);
-            logSinkFunction.invoke(logRecord, sinkContext);
-        }
-    }
-
-    protected abstract SinkRecord processRecord(T record) throws Exception;
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-
-        write.snapshotState(context);
-
-        if (logSinkFunction != null) {
-            StreamingFunctionUtils.snapshotFunctionState(
-                    context, getOperatorStateBackend(), logSinkFunction);
-        }
-    }
-
-    @Override
-    public void finish() throws Exception {
-        super.finish();
-
-        if (logSinkFunction != null) {
-            logSinkFunction.finish();
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-
-        if (write != null) {
-            write.close();
-        }
-
-        if (logSinkFunction != null) {
-            FunctionUtils.closeFunction(logSinkFunction);
-        }
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        super.notifyCheckpointComplete(checkpointId);
-
-        if (logSinkFunction instanceof CheckpointListener) {
-            ((CheckpointListener) 
logSinkFunction).notifyCheckpointComplete(checkpointId);
-        }
-    }
-
-    @Override
-    public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        super.notifyCheckpointAborted(checkpointId);
-
-        if (logSinkFunction instanceof CheckpointListener) {
-            ((CheckpointListener) 
logSinkFunction).notifyCheckpointAborted(checkpointId);
-        }
-    }
-
-    @Override
-    protected List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
-            throws IOException {
-        List<Committable> committables = write.prepareCommit(doCompaction, 
checkpointId);
-
-        if (logCallback != null) {
-            try {
-                logSinkFunction.flush();
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            logCallback
-                    .offsets()
-                    .forEach(
-                            (k, v) ->
-                                    committables.add(
-                                            new Committable(
-                                                    checkpointId,
-                                                    
Committable.Kind.LOG_OFFSET,
-                                                    new 
LogOffsetCommittable(k, v))));
-        }
-
-        return committables;
-    }
-
-    private class SimpleContext implements SinkFunction.Context {
-
-        @Nullable private Long timestamp;
-
-        private final ProcessingTimeService processingTimeService;
-
-        public SimpleContext(ProcessingTimeService processingTimeService) {
-            this.processingTimeService = processingTimeService;
-        }
-
-        @Override
-        public long currentProcessingTime() {
-            return processingTimeService.getCurrentProcessingTime();
-        }
-
-        @Override
-        public long currentWatermark() {
-            return currentWatermark;
-        }
-
-        @Override
-        public Long timestamp() {
-            return timestamp;
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
index c32b56b80..439a97c28 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
@@ -31,18 +31,16 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  */
 public class BucketingStreamPartitioner<T> extends StreamPartitioner<T> {
 
-    private final AbstractChannelComputer.Provider<T> channelComputerProvider;
+    private final ChannelComputer<T> channelComputer;
 
-    private transient AbstractChannelComputer<T> channelComputer;
-
-    public BucketingStreamPartitioner(AbstractChannelComputer.Provider<T> 
channelComputerProvider) {
-        this.channelComputerProvider = channelComputerProvider;
+    public BucketingStreamPartitioner(ChannelComputer<T> channelComputer) {
+        this.channelComputer = channelComputer;
     }
 
     @Override
     public void setup(int numberOfChannels) {
         super.setup(numberOfChannels);
-        channelComputer = channelComputerProvider.provide(numberOfChannels);
+        channelComputer.setup(numberOfChannels);
     }
 
     @Override
@@ -67,6 +65,6 @@ public class BucketingStreamPartitioner<T> extends 
StreamPartitioner<T> {
 
     @Override
     public String toString() {
-        return channelComputerProvider.toString();
+        return channelComputer.toString();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
similarity index 60%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index 2ae41e577..fdef4c141 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -18,20 +18,16 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.schema.TableSchema;
+import java.io.Serializable;
 
-import org.apache.flink.table.data.RowData;
-
-/** {@link AbstractChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer extends AbstractChannelComputer<RowData> {
+/**
+ * A utility class to compute which downstream channel a given record should 
be sent to.
+ *
+ * @param <T> type of record
+ */
+public interface ChannelComputer<T> extends Serializable {
 
-    public RowDataChannelComputer(
-            int numChannels, TableSchema schema, boolean 
shuffleByPartitionEnable) {
-        super(numChannels, new RowDataKeyAndBucketExtractor(schema), 
shuffleByPartitionEnable);
-    }
+    void setup(int numChannels);
 
-    @Override
-    public int channel(RowData record) {
-        return channelImpl(record);
-    }
+    int channel(T record);
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index ee26918a9..45975f219 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -39,8 +39,8 @@ public class CompactorSink extends FlinkSink<RowData> {
 
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
-        return new StoreCompactOperator(table, writeProvider, isStreaming);
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new StoreCompactOperator(table, writeProvider, isStreaming, 
commitUser);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index b8d5bf2af..cd2ff68d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -53,8 +53,8 @@ public class FileStoreSink extends FlinkSink<RowData> {
 
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
-        return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider);
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider, commitUser);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f0fe07081..d0a456d26 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -61,48 +61,46 @@ public abstract class FlinkSink<T> implements Serializable {
         this.isOverwrite = isOverwrite;
     }
 
-    protected StoreSinkWrite.Provider createWriteProvider(String 
initialCommitUser) {
+    private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig 
checkpointConfig) {
         boolean waitCompaction;
         if (table.coreOptions().writeOnly()) {
             waitCompaction = false;
         } else {
             Options options = table.coreOptions().toConfiguration();
             ChangelogProducer changelogProducer = 
table.coreOptions().changelogProducer();
-            if (changelogProducer == ChangelogProducer.FULL_COMPACTION
-                    && 
options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+            waitCompaction =
+                    changelogProducer == ChangelogProducer.LOOKUP
+                            && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+
+            int deltaCommits = -1;
+            if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+                deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
+            } else if 
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
                 long fullCompactionThresholdMs =
                         
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
-                return (table, context, ioManager) ->
-                        new FullChangelogStoreSinkWrite(
-                                table,
-                                context,
-                                initialCommitUser,
-                                ioManager,
-                                isOverwrite,
-                                fullCompactionThresholdMs);
+                deltaCommits =
+                        (int)
+                                (fullCompactionThresholdMs
+                                        / 
checkpointConfig.getCheckpointInterval());
             }
 
-            waitCompaction =
-                    changelogProducer == ChangelogProducer.LOOKUP
-                            && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
-            if (changelogProducer == ChangelogProducer.FULL_COMPACTION
-                    || options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
-                int deltaCommits = 
options.getOptional(FULL_COMPACTION_DELTA_COMMITS).orElse(1);
-                return (table, context, ioManager) ->
+            if (changelogProducer == ChangelogProducer.FULL_COMPACTION || 
deltaCommits >= 0) {
+                int finalDeltaCommits = Math.max(deltaCommits, 1);
+                return (table, commitUser, state, ioManager) ->
                         new GlobalFullCompactionSinkWrite(
                                 table,
-                                context,
-                                initialCommitUser,
+                                commitUser,
+                                state,
                                 ioManager,
                                 isOverwrite,
                                 waitCompaction,
-                                deltaCommits);
+                                finalDeltaCommits);
             }
         }
 
-        return (table, context, ioManager) ->
+        return (table, commitUser, state, ioManager) ->
                 new StoreSinkWriteImpl(
-                        table, context, initialCommitUser, ioManager, 
isOverwrite, waitCompaction);
+                        table, commitUser, state, ioManager, isOverwrite, 
waitCompaction);
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input) {
@@ -112,7 +110,10 @@ public abstract class FlinkSink<T> implements Serializable 
{
         // When the job restarts, commitUser will be recovered from states and 
this value is
         // ignored.
         String initialCommitUser = UUID.randomUUID().toString();
-        return sinkFrom(input, initialCommitUser, 
createWriteProvider(initialCommitUser));
+        return sinkFrom(
+                input,
+                initialCommitUser,
+                
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
     }
 
     public DataStreamSink<?> sinkFrom(
@@ -134,7 +135,7 @@ public abstract class FlinkSink<T> implements Serializable {
                 input.transform(
                                 WRITER_NAME + " -> " + table.name(),
                                 typeInfo,
-                                createWriteOperator(sinkProvider, isStreaming))
+                                createWriteOperator(sinkProvider, isStreaming, 
commitUser))
                         .setParallelism(input.getParallelism());
 
         SingleOutputStreamOperator<?> committed =
@@ -165,7 +166,7 @@ public abstract class FlinkSink<T> implements Serializable {
     }
 
     protected abstract OneInputStreamOperator<T, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming);
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser);
 
     protected abstract SerializableFunction<String, Committer> 
createCommitterFactory(
             boolean streamingCheckpointEnabled);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index acdc76f33..c55c297f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -19,9 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.operation.Lock;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -85,14 +83,9 @@ public class FlinkSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        TableSchema schema = table.schema();
-        boolean shuffleByPartitionEnable =
-                table.coreOptions()
-                        .toConfiguration()
-                        .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION);
         BucketingStreamPartitioner<RowData> partitioner =
                 new BucketingStreamPartitioner<>(
-                        new ChannelComputerProvider(schema, 
shuffleByPartitionEnable));
+                        new RowDataChannelComputer(table.schema(), 
logSinkFunction != null));
         PartitionTransformation<RowData> partitioned =
                 new PartitionTransformation<>(input.getTransformation(), 
partitioner);
         if (parallelism != null) {
@@ -106,32 +99,4 @@ public class FlinkSinkBuilder {
                 ? sink.sinkFrom(new DataStream<>(env, partitioned), 
commitUser, sinkProvider)
                 : sink.sinkFrom(new DataStream<>(env, partitioned));
     }
-
-    private static class ChannelComputerProvider
-            implements AbstractChannelComputer.Provider<RowData> {
-
-        private static final long serialVersionUID = 1L;
-
-        private final TableSchema schema;
-        private final boolean shuffleByPartitionEnable;
-
-        private ChannelComputerProvider(TableSchema schema, boolean 
shuffleByPartitionEnable) {
-            this.schema = schema;
-            this.shuffleByPartitionEnable = shuffleByPartitionEnable;
-        }
-
-        @Override
-        public AbstractChannelComputer<RowData> provide(int numChannels) {
-            return new RowDataChannelComputer(numChannels, schema, 
shuffleByPartitionEnable);
-        }
-
-        @Override
-        public String toString() {
-            if (shuffleByPartitionEnable) {
-                return "HASH[bucket, partition]";
-            } else {
-                return "HASH[bucket]";
-            }
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
deleted file mode 100644
index 932a8bb01..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.BinaryRowTypeSerializer;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.paimon.utils.SnapshotManager;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * {@link StoreSinkWrite} for {@link 
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
- * producer. This writer will perform full compaction once in a while.
- *
- * @deprecated use {@link GlobalFullCompactionSinkWrite}.
- */
-public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
-
-    private final SnapshotManager snapshotManager;
-    private final long fullCompactionThresholdMs;
-
-    private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
-    private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>> 
writtenBuckets;
-    private final ListState<Tuple3<Long, BinaryRow, Integer>> 
writtenBucketState;
-
-    private Long currentFirstWriteMs;
-    private final NavigableMap<Long, Long> firstWriteMs;
-    private final ListState<Tuple2<Long, Long>> firstWriteMsState;
-
-    private final TreeSet<Long> commitIdentifiersToCheck;
-
-    public FullChangelogStoreSinkWrite(
-            FileStoreTable table,
-            StateInitializationContext context,
-            String initialCommitUser,
-            IOManager ioManager,
-            boolean isOverwrite,
-            long fullCompactionThresholdMs)
-            throws Exception {
-        super(table, context, initialCommitUser, ioManager, isOverwrite, 
false);
-
-        this.snapshotManager = table.snapshotManager();
-        this.fullCompactionThresholdMs = fullCompactionThresholdMs;
-
-        currentWrittenBuckets = new HashSet<>();
-        TupleSerializer<Tuple3<Long, BinaryRow, Integer>> 
writtenBucketStateSerializer =
-                new TupleSerializer<>(
-                        (Class<Tuple3<Long, BinaryRow, Integer>>) (Class<?>) 
Tuple3.class,
-                        new TypeSerializer[] {
-                            LongSerializer.INSTANCE,
-                            new BinaryRowTypeSerializer(
-                                    
table.schema().logicalPartitionType().getFieldCount()),
-                            IntSerializer.INSTANCE
-                        });
-        writtenBucketState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        "paimon_written_buckets", 
writtenBucketStateSerializer));
-        writtenBuckets = new TreeMap<>();
-        writtenBucketState
-                .get()
-                .forEach(
-                        t ->
-                                writtenBuckets
-                                        .computeIfAbsent(t.f0, k -> new 
HashSet<>())
-                                        .add(Tuple2.of(t.f1, t.f2)));
-
-        currentFirstWriteMs = null;
-        TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
-                new TupleSerializer<>(
-                        (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-                        new TypeSerializer[] {LongSerializer.INSTANCE, 
LongSerializer.INSTANCE});
-        firstWriteMsState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        "first_write_ms", 
firstWriteMsStateSerializer));
-        firstWriteMs = new TreeMap<>();
-        firstWriteMsState.get().forEach(t -> firstWriteMs.put(t.f0, t.f1));
-
-        commitIdentifiersToCheck = new TreeSet<>();
-    }
-
-    @Override
-    public SinkRecord write(InternalRow rowData) throws Exception {
-        SinkRecord sinkRecord = super.write(rowData);
-        touchBucket(sinkRecord.partition(), sinkRecord.bucket());
-        return sinkRecord;
-    }
-
-    @Override
-    public void compact(BinaryRow partition, int bucket, boolean 
fullCompaction) throws Exception {
-        super.compact(partition, bucket, fullCompaction);
-        touchBucket(partition, bucket);
-    }
-
-    private void touchBucket(BinaryRow partition, int bucket) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("touch partition {}, bucket {}", partition, bucket);
-        }
-
-        // partition is a reused BinaryRow
-        // we first check if the tuple exists to minimize copying
-        if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
-            currentWrittenBuckets.add(Tuple2.of(partition.copy(), bucket));
-        }
-
-        if (currentFirstWriteMs == null) {
-            currentFirstWriteMs = System.currentTimeMillis();
-        }
-    }
-
-    @Override
-    public List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
-            throws IOException {
-        checkSuccessfulFullCompaction();
-
-        // check what buckets we've modified during this checkpoint interval
-        if (!currentWrittenBuckets.isEmpty()) {
-            writtenBuckets
-                    .computeIfAbsent(checkpointId, k -> new HashSet<>())
-                    .addAll(currentWrittenBuckets);
-            currentWrittenBuckets.clear();
-            firstWriteMs.putIfAbsent(checkpointId, currentFirstWriteMs);
-            currentFirstWriteMs = null;
-        }
-
-        if (LOG.isDebugEnabled()) {
-            for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> 
checkpointIdAndBuckets :
-                    writtenBuckets.entrySet()) {
-                LOG.debug(
-                        "Written buckets for checkpoint #{} are:", 
checkpointIdAndBuckets.getKey());
-                for (Tuple2<BinaryRow, Integer> bucket : 
checkpointIdAndBuckets.getValue()) {
-                    LOG.debug("  * partition {}, bucket {}", bucket.f0, 
bucket.f1);
-                }
-            }
-        }
-
-        if (!writtenBuckets.isEmpty() // there should be something to compact
-                && System.currentTimeMillis() - 
firstWriteMs.firstEntry().getValue()
-                        >= fullCompactionThresholdMs // time without full 
compaction exceeds
-        ) {
-            doCompaction = true;
-        }
-
-        if (doCompaction) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Submit full compaction for checkpoint #{}", 
checkpointId);
-            }
-            submitFullCompaction();
-            commitIdentifiersToCheck.add(checkpointId);
-        }
-
-        return super.prepareCommit(doCompaction, checkpointId);
-    }
-
-    private void checkSuccessfulFullCompaction() {
-        Long latestId = snapshotManager.latestSnapshotId();
-        if (latestId == null) {
-            return;
-        }
-        Long earliestId = snapshotManager.earliestSnapshotId();
-        if (earliestId == null) {
-            return;
-        }
-
-        for (long id = latestId; id >= earliestId; id--) {
-            Snapshot snapshot = snapshotManager.snapshot(id);
-            if (snapshot.commitUser().equals(commitUser)
-                    && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
-                long commitIdentifier = snapshot.commitIdentifier();
-                if (commitIdentifiersToCheck.contains(commitIdentifier)) {
-                    // We found a full compaction snapshot triggered by 
`submitFullCompaction`
-                    // method.
-                    //
-                    // Because `submitFullCompaction` will compact all buckets 
in `writtenBuckets`,
-                    // thus a successful commit indicates that all previous 
buckets have been
-                    // compacted.
-                    //
-                    // We must make sure that the compact snapshot is 
triggered by
-                    // `submitFullCompaction`, because normal compaction may 
also trigger full
-                    // compaction, but that only compacts a specific bucket, 
not all buckets
-                    // recorded in `writtenBuckets`.
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "Found full compaction snapshot #{} with 
identifier {}",
-                                id,
-                                commitIdentifier);
-                    }
-                    writtenBuckets.headMap(commitIdentifier, true).clear();
-                    firstWriteMs.headMap(commitIdentifier, true).clear();
-                    commitIdentifiersToCheck.headSet(commitIdentifier).clear();
-                    break;
-                }
-            }
-        }
-    }
-
-    private void submitFullCompaction() {
-        Set<Tuple2<BinaryRow, Integer>> compactedBuckets = new HashSet<>();
-        writtenBuckets.forEach(
-                (checkpointId, buckets) -> {
-                    for (Tuple2<BinaryRow, Integer> bucket : buckets) {
-                        if (compactedBuckets.contains(bucket)) {
-                            continue;
-                        }
-                        compactedBuckets.add(bucket);
-                        try {
-                            write.compact(bucket.f0, bucket.f1, true);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-
-        List<Tuple3<Long, BinaryRow, Integer>> writtenBucketList = new 
ArrayList<>();
-        for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry : 
writtenBuckets.entrySet()) {
-            for (Tuple2<BinaryRow, Integer> bucket : entry.getValue()) {
-                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, 
bucket.f1));
-            }
-        }
-        writtenBucketState.update(writtenBucketList);
-
-        List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
-        for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
-            firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
-        }
-        firstWriteMsState.update(firstWriteMsList);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index ac666ff12..68bc682ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -21,22 +21,12 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.BinaryRowTypeSerializer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,53 +52,41 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
 
     private final int deltaCommits;
 
+    private final String tableName;
     private final SnapshotManager snapshotManager;
 
     private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
     private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>> 
writtenBuckets;
-    private final ListState<Tuple3<Long, BinaryRow, Integer>> 
writtenBucketState;
+    private static final String WRITTEN_BUCKETS_STATE_NAME = 
"paimon_written_buckets";
 
     private final TreeSet<Long> commitIdentifiersToCheck;
 
     public GlobalFullCompactionSinkWrite(
             FileStoreTable table,
-            StateInitializationContext context,
-            String initialCommitUser,
+            String commitUser,
+            StoreSinkWriteState state,
             IOManager ioManager,
             boolean isOverwrite,
             boolean waitCompaction,
-            int deltaCommits)
-            throws Exception {
-        super(table, context, initialCommitUser, ioManager, isOverwrite, 
waitCompaction);
+            int deltaCommits) {
+        super(table, commitUser, state, ioManager, isOverwrite, 
waitCompaction);
 
         this.deltaCommits = deltaCommits;
 
+        this.tableName = table.name();
         this.snapshotManager = table.snapshotManager();
 
         currentWrittenBuckets = new HashSet<>();
-        @SuppressWarnings("unchecked")
-        TupleSerializer<Tuple3<Long, BinaryRow, Integer>> 
writtenBucketStateSerializer =
-                new TupleSerializer<>(
-                        (Class<Tuple3<Long, BinaryRow, Integer>>) (Class<?>) 
Tuple3.class,
-                        new TypeSerializer[] {
-                            LongSerializer.INSTANCE,
-                            new BinaryRowTypeSerializer(
-                                    
table.schema().logicalPartitionType().getFieldCount()),
-                            IntSerializer.INSTANCE
-                        });
-        writtenBucketState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        "paimon_written_buckets", 
writtenBucketStateSerializer));
         writtenBuckets = new TreeMap<>();
-        writtenBucketState
-                .get()
-                .forEach(
-                        t ->
-                                writtenBuckets
-                                        .computeIfAbsent(t.f0, k -> new 
HashSet<>())
-                                        .add(Tuple2.of(t.f1, t.f2)));
+        List<StoreSinkWriteState.StateValue> writtenBucketStateValues =
+                state.get(tableName, WRITTEN_BUCKETS_STATE_NAME);
+        if (writtenBucketStateValues != null) {
+            for (StoreSinkWriteState.StateValue stateValue : 
writtenBucketStateValues) {
+                writtenBuckets
+                        .computeIfAbsent(bytesToLong(stateValue.value()), k -> 
new HashSet<>())
+                        .add(Tuple2.of(stateValue.partition(), 
stateValue.bucket()));
+            }
+        }
 
         commitIdentifiersToCheck = new TreeSet<>();
     }
@@ -240,15 +218,35 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
     }
 
     @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
+    public void snapshotState() throws Exception {
+        super.snapshotState();
 
-        List<Tuple3<Long, BinaryRow, Integer>> writtenBucketList = new 
ArrayList<>();
+        List<StoreSinkWriteState.StateValue> writtenBucketList = new 
ArrayList<>();
         for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry : 
writtenBuckets.entrySet()) {
             for (Tuple2<BinaryRow, Integer> bucket : entry.getValue()) {
-                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, 
bucket.f1));
+                writtenBucketList.add(
+                        new StoreSinkWriteState.StateValue(
+                                bucket.f0, bucket.f1, 
longToBytes(entry.getKey())));
             }
         }
-        writtenBucketState.update(writtenBucketList);
+        state.put(tableName, WRITTEN_BUCKETS_STATE_NAME, writtenBucketList);
+    }
+
+    private static byte[] longToBytes(long l) {
+        byte[] result = new byte[8];
+        for (int i = 7; i >= 0; i--) {
+            result[i] = (byte) (l & 0xFF);
+            l >>= 8;
+        }
+        return result;
+    }
+
+    private static long bytesToLong(final byte[] b) {
+        long result = 0;
+        for (int i = 0; i < 8; i++) {
+            result <<= 8;
+            result |= (b[i] & 0xFF);
+        }
+        return result;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 2ae41e577..85e7ebce7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -18,20 +18,54 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
 import org.apache.flink.table.data.RowData;
 
-/** {@link AbstractChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer extends AbstractChannelComputer<RowData> {
+/** {@link ChannelComputer} for {@link RowData}. */
+public class RowDataChannelComputer implements ChannelComputer<RowData> {
 
-    public RowDataChannelComputer(
-            int numChannels, TableSchema schema, boolean 
shuffleByPartitionEnable) {
-        super(numChannels, new RowDataKeyAndBucketExtractor(schema), 
shuffleByPartitionEnable);
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+    private final boolean hasLogSink;
+
+    private transient int numChannels;
+    private transient KeyAndBucketExtractor<RowData> extractor;
+
+    public RowDataChannelComputer(TableSchema schema, boolean hasLogSink) {
+        this.schema = schema;
+        this.hasLogSink = hasLogSink;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new RowDataKeyAndBucketExtractor(schema);
     }
 
     @Override
     public int channel(RowData record) {
-        return channelImpl(record);
+        extractor.setRecord(record);
+        return channel(extractor.partition(), extractor.bucket());
+    }
+
+    public int channel(BinaryRow partition, int bucket) {
+        int startChannel;
+        if (hasLogSink) {
+            // log sinks like Kafka only consider bucket and don't care about 
partition
+            // so same bucket, even from different partition, must go to the 
same channel
+            startChannel = 0;
+        } else {
+            startChannel = Math.abs(partition.hashCode()) % numChannels;
+        }
+        return (startChannel + bucket) % numChannels;
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by bucket";
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 55a6b8375..16f5f98f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -19,34 +19,243 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.log.LogWriteCallback;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 
-/**
- * An {@link AbstractStoreWriteOperator} which accepts Flink's {@link 
RowData}. The schema of its
- * writer never changes.
- */
-public class RowDataStoreWriteOperator extends 
AbstractStoreWriteOperator<RowData> {
+/** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is 
fixed. */
+public class RowDataStoreWriteOperator extends PrepareCommitOperator<RowData> {
+
+    private static final long serialVersionUID = 3L;
+
+    private final FileStoreTable table;
+    @Nullable private final LogSinkFunction logSinkFunction;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
+    private final String initialCommitUser;
+
+    private transient StoreSinkWriteState state;
+    private transient StoreSinkWrite write;
+    private transient SimpleContext sinkContext;
+    @Nullable private transient LogWriteCallback logCallback;
+
+    /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
+    private long currentWatermark = Long.MIN_VALUE;
 
     public RowDataStoreWriteOperator(
             FileStoreTable table,
             @Nullable LogSinkFunction logSinkFunction,
-            StoreSinkWrite.Provider storeSinkWriteProvider) {
-        super(table, logSinkFunction, storeSinkWriteProvider);
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser) {
+        this.table = table;
+        this.logSinkFunction = logSinkFunction;
+        this.storeSinkWriteProvider = storeSinkWriteProvider;
+        this.initialCommitUser = initialCommitUser;
     }
 
     @Override
-    protected SinkRecord processRecord(RowData record) throws Exception {
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Committable>> output) {
+        super.setup(containingTask, config, output);
+        if (logSinkFunction != null) {
+            FunctionUtils.setFunctionRuntimeContext(logSinkFunction, 
getRuntimeContext());
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Each job can only have one user name and this name must be 
consistent across restarts.
+        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        String commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, 
initialCommitUser);
+
+        RowDataChannelComputer channelComputer =
+                new RowDataChannelComputer(table.schema(), logSinkFunction != 
null);
+        
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+        state =
+                new StoreSinkWriteState(
+                        context,
+                        (tableName, partition, bucket) ->
+                                channelComputer.channel(partition, bucket)
+                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+
+        write =
+                storeSinkWriteProvider.provide(
+                        table,
+                        commitUser,
+                        state,
+                        getContainingTask().getEnvironment().getIOManager());
+
+        if (logSinkFunction != null) {
+            StreamingFunctionUtils.restoreFunctionState(context, 
logSinkFunction);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        this.sinkContext = new SimpleContext(getProcessingTimeService());
+        if (logSinkFunction != null) {
+            FunctionUtils.openFunction(logSinkFunction, new Configuration());
+            logCallback = new LogWriteCallback();
+            logSinkFunction.setWriteCallback(logCallback);
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+
+        this.currentWatermark = mark.getTimestamp();
+        if (logSinkFunction != null) {
+            logSinkFunction.writeWatermark(
+                    new 
org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        sinkContext.timestamp = element.hasTimestamp() ? 
element.getTimestamp() : null;
+
+        SinkRecord record;
         try {
-            return write.write(new FlinkRowWrapper(record));
+            record = write.write(new FlinkRowWrapper(element.getValue()));
         } catch (Exception e) {
             throw new IOException(e);
         }
+
+        if (logSinkFunction != null) {
+            // write to log store, need to preserve original pk (which 
includes partition fields)
+            SinkRecord logRecord = write.toLogRecord(record);
+            logSinkFunction.invoke(logRecord, sinkContext);
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        write.snapshotState();
+        state.snapshotState();
+
+        if (logSinkFunction != null) {
+            StreamingFunctionUtils.snapshotFunctionState(
+                    context, getOperatorStateBackend(), logSinkFunction);
+        }
+    }
+
+    @Override
+    public void finish() throws Exception {
+        super.finish();
+
+        if (logSinkFunction != null) {
+            logSinkFunction.finish();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+
+        write.close();
+        if (logSinkFunction != null) {
+            FunctionUtils.closeFunction(logSinkFunction);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+
+        if (logSinkFunction instanceof CheckpointListener) {
+            ((CheckpointListener) 
logSinkFunction).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        super.notifyCheckpointAborted(checkpointId);
+
+        if (logSinkFunction instanceof CheckpointListener) {
+            ((CheckpointListener) 
logSinkFunction).notifyCheckpointAborted(checkpointId);
+        }
+    }
+
+    @Override
+    protected List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
+            throws IOException {
+        List<Committable> committables = write.prepareCommit(doCompaction, 
checkpointId);
+
+        if (logCallback != null) {
+            try {
+                logSinkFunction.flush();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+            logCallback
+                    .offsets()
+                    .forEach(
+                            (k, v) ->
+                                    committables.add(
+                                            new Committable(
+                                                    checkpointId,
+                                                    
Committable.Kind.LOG_OFFSET,
+                                                    new 
LogOffsetCommittable(k, v))));
+        }
+
+        return committables;
+    }
+
+    private class SimpleContext implements SinkFunction.Context {
+
+        @Nullable private Long timestamp;
+
+        private final ProcessingTimeService processingTimeService;
+
+        public SimpleContext(ProcessingTimeService processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public long currentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public long currentWatermark() {
+            return currentWatermark;
+        }
+
+        @Override
+        public Long timestamp() {
+            return timestamp;
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
index 53756a43b..934aceeca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
@@ -39,22 +39,12 @@ public class StateUtils {
             throws Exception {
         ListState<T> state =
                 context.getOperatorStateStore()
-                        .getListState(new ListStateDescriptor<>(stateName, 
valueClass));
+                        .getUnionListState(new 
ListStateDescriptor<>(stateName, valueClass));
 
         List<T> values = new ArrayList<>();
         state.get().forEach(values::add);
 
         if (context.isRestored()) {
-            // Values may contain 0 element or more than 1 element.
-            //
-            // Let's say a vertex has 3 tasks (A, B and C). If A is finished 
while B and C are still
-            // running, then states of A will be divided between B and C. That 
is, if the job
-            // restarts, state of vertex A will be empty, and state of vertex 
B and C may contain
-            // more than 1 element.
-            if (values.isEmpty()) {
-                return null;
-            }
-
             // As we're storing the same value for each task, we hereby check 
if all elements are
             // equal.
             for (int i = 1; i < values.size(); i++) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 89ab1e721..156b3a798 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.OffsetRow;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 
@@ -47,7 +48,9 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
     private final FileStoreTable table;
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
     private final boolean isStreaming;
+    private final String initialCommitUser;
 
+    private transient StoreSinkWriteState state;
     private transient StoreSinkWrite write;
     private transient InternalRowSerializer partitionSerializer;
     private transient OffsetRow reusedPartition;
@@ -56,21 +59,43 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
     public StoreCompactOperator(
             FileStoreTable table,
             StoreSinkWrite.Provider storeSinkWriteProvider,
-            boolean isStreaming) {
+            boolean isStreaming,
+            String initialCommitUser) {
         Preconditions.checkArgument(
                 !table.coreOptions().writeOnly(),
                 CoreOptions.WRITE_ONLY.key() + " should not be true for 
StoreCompactOperator.");
         this.table = table;
         this.storeSinkWriteProvider = storeSinkWriteProvider;
         this.isStreaming = isStreaming;
+        this.initialCommitUser = initialCommitUser;
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
+
+        // Each job can only have one user name and this name must be 
consistent across restarts.
+        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        String commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, 
initialCommitUser);
+
+        RowDataChannelComputer channelComputer = new 
RowDataChannelComputer(table.schema(), false);
+        
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+        state =
+                new StoreSinkWriteState(
+                        context,
+                        (tableName, partition, bucket) ->
+                                channelComputer.channel(partition, bucket)
+                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+
         write =
                 storeSinkWriteProvider.provide(
-                        table, context, 
getContainingTask().getEnvironment().getIOManager());
+                        table,
+                        commitUser,
+                        state,
+                        getContainingTask().getEnvironment().getIOManager());
     }
 
     @Override
@@ -112,4 +137,17 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
             throws IOException {
         return write.prepareCommit(doCompaction, checkpointId);
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        write.snapshotState();
+        state.snapshotState();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        write.close();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 4e3e3a60a..0eae74e00 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -26,15 +26,13 @@ import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.table.sink.TableWriteImpl;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.function.Function;
 
-/** Helper class of {@link AbstractStoreWriteOperator} for different types of 
paimon sinks. */
+/** Helper class of {@link PrepareCommitOperator} for different types of 
paimon sinks. */
 public interface StoreSinkWrite {
 
     SinkRecord write(InternalRow rowData) throws Exception;
@@ -47,7 +45,7 @@ public interface StoreSinkWrite {
 
     List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) 
throws IOException;
 
-    void snapshotState(StateSnapshotContext context) throws Exception;
+    void snapshotState() throws Exception;
 
     void close() throws Exception;
 
@@ -68,7 +66,9 @@ public interface StoreSinkWrite {
     interface Provider extends Serializable {
 
         StoreSinkWrite provide(
-                FileStoreTable table, StateInitializationContext context, 
IOManager ioManager)
-                throws Exception;
+                FileStoreTable table,
+                String commitUser,
+                StoreSinkWriteState state,
+                IOManager ioManager);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index e5988cca9..67f6ddab1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -29,8 +29,6 @@ import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.table.sink.TableWriteImpl;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,42 +43,26 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
     private static final Logger LOG = 
LoggerFactory.getLogger(StoreSinkWriteImpl.class);
 
     protected final String commitUser;
+    protected final StoreSinkWriteState state;
     private final boolean waitCompaction;
 
     protected TableWriteImpl<?> write;
 
     public StoreSinkWriteImpl(
             FileStoreTable table,
-            StateInitializationContext context,
-            String initialCommitUser,
+            String commitUser,
+            StoreSinkWriteState state,
             IOManager ioManager,
             boolean isOverwrite,
-            boolean waitCompaction)
-            throws Exception {
-        // Each job can only have one user name and this name must be 
consistent across restarts.
-        // We cannot use job id as commit user name here because user may 
change job id by creating
-        // a savepoint, stop the job and then resume from savepoint.
-        commitUser =
-                StateUtils.getSingleValueFromState(
-                        context, "commit_user_state", String.class, 
initialCommitUser);
-
-        // State will be null if the upstream of this subtask has finished, 
but some other subtasks
-        // are still running.
-        // See comments of StateUtils.getSingleValueFromState for more detail.
-        //
-        // If the state is null, no new records will come. We only need to 
deal with checkpoints and
-        // close events.
-        if (commitUser == null) {
-            write = null;
-        } else {
-            write =
-                    table.newWrite(commitUser)
-                            .withIOManager(
-                                    new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
-                            .withOverwrite(isOverwrite);
-        }
-
+            boolean waitCompaction) {
+        this.commitUser = commitUser;
+        this.state = state;
         this.waitCompaction = waitCompaction;
+
+        write =
+                table.newWrite(commitUser)
+                        .withIOManager(new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+                        .withOverwrite(isOverwrite);
     }
 
     @Override
@@ -131,7 +113,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
     }
 
     @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
+    public void snapshotState() throws Exception {
         // do nothing
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
new file mode 100644
index 000000000..feff9e9dd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * States for {@link StoreSinkWrite}s.
+ *
+ * <p>States are positioned first by table name and then by key name. This 
class should be initiated
+ * in a sink operator and then given to {@link StoreSinkWrite}.
+ */
+public class StoreSinkWriteState {
+
+    private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> 
listState;
+    private final Map<String, Map<String, List<StateValue>>> map;
+
+    @SuppressWarnings("unchecked")
+    public StoreSinkWriteState(
+            StateInitializationContext context, StateValueFilter 
stateValueFilter)
+            throws Exception {
+        TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> 
listStateSerializer =
+                new TupleSerializer<>(
+                        (Class<Tuple5<String, String, byte[], Integer, 
byte[]>>)
+                                (Class<?>) Tuple5.class,
+                        new TypeSerializer[] {
+                            StringSerializer.INSTANCE,
+                            StringSerializer.INSTANCE,
+                            BytePrimitiveArraySerializer.INSTANCE,
+                            IntSerializer.INSTANCE,
+                            BytePrimitiveArraySerializer.INSTANCE
+                        });
+        listState =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>(
+                                        "paimon_store_sink_write_state", 
listStateSerializer));
+
+        map = new HashMap<>();
+        for (Tuple5<String, String, byte[], Integer, byte[]> tuple : 
listState.get()) {
+            BinaryRow partition = 
SerializationUtils.deserializeBinaryRow(tuple.f2);
+            if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
+                map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
+                        .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
+                        .add(new StateValue(partition, tuple.f3, tuple.f4));
+            }
+        }
+    }
+
+    public @Nullable List<StateValue> get(String tableName, String key) {
+        Map<String, List<StateValue>> innerMap = map.get(tableName);
+        return innerMap == null ? null : innerMap.get(key);
+    }
+
+    public void put(String tableName, String key, List<StateValue> 
stateValues) {
+        map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key, 
stateValues);
+    }
+
+    public void snapshotState() throws Exception {
+        List<Tuple5<String, String, byte[], Integer, byte[]>> list = new 
ArrayList<>();
+        for (Map.Entry<String, Map<String, List<StateValue>>> tables : 
map.entrySet()) {
+            for (Map.Entry<String, List<StateValue>> entry : 
tables.getValue().entrySet()) {
+                for (StateValue stateValue : entry.getValue()) {
+                    list.add(
+                            Tuple5.of(
+                                    tables.getKey(),
+                                    entry.getKey(),
+                                    
SerializationUtils.serializeBinaryRow(stateValue.partition()),
+                                    stateValue.bucket(),
+                                    stateValue.value()));
+                }
+            }
+        }
+        listState.update(list);
+    }
+
+    /**
+     * A state value for {@link StoreSinkWrite}. All state values should be 
given a partition and a
+     * bucket so that they can be redistributed once the sink parallelism is 
changed.
+     */
+    public static class StateValue {
+
+        private final BinaryRow partition;
+        private final int bucket;
+        private final byte[] value;
+
+        public StateValue(BinaryRow partition, int bucket, byte[] value) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.value = value;
+        }
+
+        public BinaryRow partition() {
+            return partition;
+        }
+
+        public int bucket() {
+            return bucket;
+        }
+
+        public byte[] value() {
+            return value;
+        }
+    }
+
+    /**
+     * Given the table name, partition and bucket of a {@link StateValue} in a 
union list state,
+     * decide whether to keep this {@link StateValue} in this subtask.
+     */
+    public interface StateValueFilter {
+
+        boolean filter(String tableName, BinaryRow partition, int bucket);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index 8a815bb69..c15340a74 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -18,19 +18,44 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.flink.sink.AbstractChannelComputer;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
-/** {@link AbstractChannelComputer} for {@link CdcRecord}. */
-public class CdcRecordChannelComputer extends 
AbstractChannelComputer<CdcRecord> {
+/** {@link ChannelComputer} for {@link CdcRecord}. */
+public class CdcRecordChannelComputer implements ChannelComputer<CdcRecord> {
 
-    public CdcRecordChannelComputer(
-            int numChannels, TableSchema schema, boolean 
shuffleByPartitionEnable) {
-        super(numChannels, new CdcRecordKeyAndBucketExtractor(schema), 
shuffleByPartitionEnable);
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+
+    private transient int numChannels;
+    private transient KeyAndBucketExtractor<CdcRecord> extractor;
+
+    public CdcRecordChannelComputer(TableSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
     }
 
     @Override
     public int channel(CdcRecord record) {
-        return channelImpl(record);
+        extractor.setRecord(record);
+        return channel(extractor.partition(), extractor.bucket());
+    }
+
+    public int channel(BinaryRow partition, int bucket) {
+        int startChannel = Math.abs(partition.hashCode()) % numChannels;
+        return (startChannel + bucket) % numChannels;
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by bucket";
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
new file mode 100644
index 000000000..df45d8281
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.PrepareCommitOperator;
+import org.apache.paimon.flink.sink.StateUtils;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteState;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema 
may change. If current
+ * known schema does not fit record schema, this operator will wait for schema 
changes.
+ */
+public class CdcRecordStoreWriteOperator extends 
PrepareCommitOperator<CdcRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+            ConfigOptions.key("cdc.retry-sleep-time")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(500));
+
+    private FileStoreTable table;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
+    private final String initialCommitUser;
+    private final long retrySleepMillis;
+
+    private transient StoreSinkWriteState state;
+    private transient StoreSinkWrite write;
+
+    public CdcRecordStoreWriteOperator(
+            FileStoreTable table,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser) {
+        this.table = table;
+        this.storeSinkWriteProvider = storeSinkWriteProvider;
+        this.initialCommitUser = initialCommitUser;
+        this.retrySleepMillis =
+                
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Each job can only have one user name and this name must be 
consistent across restarts.
+        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        String commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, 
initialCommitUser);
+
+        CdcRecordChannelComputer channelComputer = new 
CdcRecordChannelComputer(table.schema());
+        
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+        state =
+                new StoreSinkWriteState(
+                        context,
+                        (tableName, partition, bucket) ->
+                                channelComputer.channel(partition, bucket)
+                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+
+        table = table.copyWithLatestSchema();
+        write =
+                storeSinkWriteProvider.provide(
+                        table,
+                        commitUser,
+                        state,
+                        getContainingTask().getEnvironment().getIOManager());
+    }
+
+    @Override
+    public void processElement(StreamRecord<CdcRecord> element) throws 
Exception {
+        CdcRecord record = element.getValue();
+        Optional<GenericRow> optionalConverted = 
record.toGenericRow(table.schema().fields());
+        if (!optionalConverted.isPresent()) {
+            while (true) {
+                table = table.copyWithLatestSchema();
+                optionalConverted = 
record.toGenericRow(table.schema().fields());
+                if (optionalConverted.isPresent()) {
+                    break;
+                }
+                Thread.sleep(retrySleepMillis);
+            }
+            write.replace(commitUser -> table.newWrite(commitUser));
+        }
+
+        try {
+            write.write(optionalConverted.get());
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        write.snapshotState();
+        state.snapshotState();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        write.close();
+    }
+
+    @Override
+    protected List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
+            throws IOException {
+        return write.prepareCommit(doCompaction, checkpointId);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 24ddd9e56..1deda066e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.Committer;
 import org.apache.paimon.flink.sink.FlinkSink;
-import org.apache.paimon.flink.sink.LogSinkFunction;
 import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
 import org.apache.paimon.flink.sink.StoreCommitter;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
@@ -34,8 +33,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.util.function.SerializableFunction;
 
-import javax.annotation.Nullable;
-
 /**
  * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema 
change if necessary.
  */
@@ -44,21 +41,16 @@ public class FlinkCdcSink extends FlinkSink<CdcRecord> {
     private static final long serialVersionUID = 1L;
 
     private final Lock.Factory lockFactory;
-    @Nullable private final LogSinkFunction logSinkFunction;
 
-    public FlinkCdcSink(
-            FileStoreTable table,
-            Lock.Factory lockFactory,
-            @Nullable LogSinkFunction logSinkFunction) {
+    public FlinkCdcSink(FileStoreTable table, Lock.Factory lockFactory) {
         super(table, false);
         this.lockFactory = lockFactory;
-        this.logSinkFunction = logSinkFunction;
     }
 
     @Override
     protected OneInputStreamOperator<CdcRecord, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
-        return new SchemaAwareStoreWriteOperator(table, logSinkFunction, 
writeProvider);
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new CdcRecordStoreWriteOperator(table, writeProvider, 
commitUser);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index 4e770e57c..6663acc54 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -18,14 +18,10 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.AbstractChannelComputer;
 import org.apache.paimon.flink.sink.BucketingStreamPartitioner;
-import org.apache.paimon.flink.sink.LogSinkFunction;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 
@@ -48,7 +44,6 @@ public class FlinkCdcSinkBuilder<T> {
     private EventParser.Factory<T> parserFactory = null;
     private FileStoreTable table = null;
     private Lock.Factory lockFactory = Lock.emptyFactory();
-    @Nullable private LogSinkFunction logSinkFunction;
 
     @Nullable private Integer parallelism;
 
@@ -72,11 +67,6 @@ public class FlinkCdcSinkBuilder<T> {
         return this;
     }
 
-    public FlinkCdcSinkBuilder<T> withLogSinkFunction(@Nullable 
LogSinkFunction logSinkFunction) {
-        this.logSinkFunction = logSinkFunction;
-        return this;
-    }
-
     public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer 
parallelism) {
         this.parallelism = parallelism;
         return this;
@@ -100,14 +90,8 @@ public class FlinkCdcSinkBuilder<T> {
                                         new SchemaManager(table.fileIO(), 
table.location())));
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
 
-        TableSchema schema = table.schema();
-        boolean shuffleByPartitionEnable =
-                table.coreOptions()
-                        .toConfiguration()
-                        .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION);
         BucketingStreamPartitioner<CdcRecord> partitioner =
-                new BucketingStreamPartitioner<>(
-                        new ChannelComputerProvider(schema, 
shuffleByPartitionEnable));
+                new BucketingStreamPartitioner<>(new 
CdcRecordChannelComputer(table.schema()));
         PartitionTransformation<CdcRecord> partitioned =
                 new PartitionTransformation<>(parsed.getTransformation(), 
partitioner);
         if (parallelism != null) {
@@ -115,35 +99,7 @@ public class FlinkCdcSinkBuilder<T> {
         }
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory, 
logSinkFunction);
+        FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory);
         return sink.sinkFrom(new DataStream<>(env, partitioned));
     }
-
-    private static class ChannelComputerProvider
-            implements AbstractChannelComputer.Provider<CdcRecord> {
-
-        private static final long serialVersionUID = 1L;
-
-        private final TableSchema schema;
-        private final boolean shuffleByPartitionEnable;
-
-        private ChannelComputerProvider(TableSchema schema, boolean 
shuffleByPartitionEnable) {
-            this.schema = schema;
-            this.shuffleByPartitionEnable = shuffleByPartitionEnable;
-        }
-
-        @Override
-        public AbstractChannelComputer<CdcRecord> provide(int numChannels) {
-            return new CdcRecordChannelComputer(numChannels, schema, 
shuffleByPartitionEnable);
-        }
-
-        @Override
-        public String toString() {
-            if (shuffleByPartitionEnable) {
-                return "HASH[bucket, partition]";
-            } else {
-                return "HASH[bucket]";
-            }
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
deleted file mode 100644
index 5b986d4e7..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
-import org.apache.paimon.flink.sink.LogSinkFunction;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-
-import org.apache.flink.runtime.state.StateInitializationContext;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
-
-/**
- * An {@link AbstractStoreWriteOperator} which is aware of schema changes.
- *
- * <p>When the input {@link CdcRecord} contains a field name not in the 
current {@link TableSchema},
- * it periodically queries the latest schema, until the latest schema contains 
that field name.
- */
-public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<CdcRecord> {
-
-    static final ConfigOption<Duration> RETRY_SLEEP_TIME =
-            ConfigOptions.key("cdc.retry-sleep-time")
-                    .durationType()
-                    .defaultValue(Duration.ofMillis(500));
-
-    private final long retrySleepMillis;
-
-    public SchemaAwareStoreWriteOperator(
-            FileStoreTable table,
-            @Nullable LogSinkFunction logSinkFunction,
-            StoreSinkWrite.Provider storeSinkWriteProvider) {
-        super(table, logSinkFunction, storeSinkWriteProvider);
-        retrySleepMillis = 
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws 
Exception {
-        table = table.copyWithLatestSchema();
-        super.initializeState(context);
-    }
-
-    @Override
-    protected SinkRecord processRecord(CdcRecord record) throws Exception {
-        Optional<GenericRow> optionalConverted = 
record.toGenericRow(table.schema().fields());
-        if (!optionalConverted.isPresent()) {
-            while (true) {
-                table = table.copyWithLatestSchema();
-                optionalConverted = 
record.toGenericRow(table.schema().fields());
-                if (optionalConverted.isPresent()) {
-                    break;
-                }
-                Thread.sleep(retrySleepMillis);
-            }
-            write.replace(commitUser -> table.newWrite(commitUser));
-        }
-
-        try {
-            return write.write(optionalConverted.get());
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
deleted file mode 100644
index a5e240c12..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.CatalogITCaseBase;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.paimon.table.sink.TableWriteImpl;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-
-import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/** Tests of shuffle data by bucket and partition. */
-public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
-    private static final int TOTAL_SOURCE_RECORD_COUNT = 1000;
-
-    @BeforeEach
-    public void after() throws Exception {
-        super.before();
-        CollectStoreSinkWrite.writeRowsMap.clear();
-    }
-
-    @Override
-    protected List<String> ddl() {
-        return Collections.singletonList(
-                "CREATE TABLE T (a INT, b INT, c INT, d INT, PRIMARY KEY (a, 
b) NOT ENFORCED) PARTITIONED BY (a)");
-    }
-
-    @Test
-    public void testShuffleByBucket() throws Exception {
-        FileStoreTable table =
-                FileStoreTableFactory.create(LocalFileIO.create(), 
getTableDirectory("T"));
-
-        insertDataToTable(table);
-
-        // Only one task will write records shuffled by bucket
-        assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 1);
-    }
-
-    @Test
-    public void testShuffleByBucketPartition() throws Exception {
-        FileStoreTable originalTable =
-                FileStoreTableFactory.create(LocalFileIO.create(), 
getTableDirectory("T"));
-        Map<String, String> dynamicOptions = 
originalTable.coreOptions().toMap();
-        
dynamicOptions.put(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key(), 
"true");
-        FileStoreTable table = originalTable.copy(dynamicOptions);
-
-        insertDataToTable(table);
-
-        // Two tasks will write records shuffled by bucket and partition
-        assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 2);
-    }
-
-    private void insertDataToTable(FileStoreTable table) throws Exception {
-        RowType rowType = toLogicalType(table.rowType());
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(2);
-
-        List<RowData> sourceDataList = generateData();
-        DataStreamSource<RowData> sourceStream =
-                env.fromCollection(sourceDataList, 
InternalTypeInfo.of(rowType));
-        new FlinkSinkBuilder(table)
-                .withInput(sourceStream)
-                .withParallelism(env.getParallelism())
-                .withSinkProvider(
-                        "testUser",
-                        (StoreSinkWrite.Provider)
-                                (table1, context, ioManager) ->
-                                        (StoreSinkWrite) new 
CollectStoreSinkWrite())
-                .build();
-        env.execute();
-
-        assertEquals(
-                
CollectStoreSinkWrite.writeRowsMap.values().stream().mapToInt(List::size).sum(),
-                TOTAL_SOURCE_RECORD_COUNT);
-    }
-
-    private List<RowData> generateData() {
-        List<RowData> rowDataList = new ArrayList<>(TOTAL_SOURCE_RECORD_COUNT);
-        for (int i = 0; i < TOTAL_SOURCE_RECORD_COUNT; i++) {
-            rowDataList.add(GenericRowData.of(i, i + 1, i + 2, i + 3));
-        }
-        return rowDataList;
-    }
-
-    /** Collect all received data with writer. */
-    private static class CollectStoreSinkWrite implements StoreSinkWrite {
-        private static final Map<StoreSinkWrite, List<InternalRow>> 
writeRowsMap =
-                new ConcurrentHashMap<>();
-
-        @Override
-        public SinkRecord write(InternalRow rowData) throws Exception {
-            List<InternalRow> rows = writeRowsMap.computeIfAbsent(this, key -> 
new ArrayList<>());
-            rows.add(rowData);
-            return null;
-        }
-
-        @Override
-        public SinkRecord toLogRecord(SinkRecord record) {
-            return record;
-        }
-
-        @Override
-        public void compact(BinaryRow partition, int bucket, boolean 
fullCompaction)
-                throws Exception {}
-
-        @Override
-        public void notifyNewFiles(
-                long snapshotId, BinaryRow partition, int bucket, 
List<DataFileMeta> files) {}
-
-        @Override
-        public List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
-                throws IOException {
-            return Collections.emptyList();
-        }
-
-        @Override
-        public void snapshotState(StateSnapshotContext context) throws 
Exception {}
-
-        @Override
-        public void close() throws Exception {}
-
-        @Override
-        public void replace(Function<String, TableWriteImpl<?>> 
newWriteProvider)
-                throws Exception {}
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
new file mode 100644
index 000000000..710f8bece
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowDataChannelComputer}. */
+public class RowDataChannelComputerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testSchemaWithPartition() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"pt", "k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                new HashMap<>(),
+                                ""));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numInputs = random.nextInt(1000) + 1;
+        List<RowData> input = new ArrayList<>();
+        for (int i = 0; i < numInputs; i++) {
+            input.add(
+                    GenericRowData.of(
+                            random.nextInt(10) + 1, random.nextLong(), 
random.nextDouble()));
+        }
+
+        testImpl(schema, input);
+    }
+
+    @Test
+    public void testSchemaNoPartition() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                new HashMap<>(),
+                                ""));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numInputs = random.nextInt(1000) + 1;
+        List<RowData> input = new ArrayList<>();
+        for (int i = 0; i < numInputs; i++) {
+            input.add(GenericRowData.of(random.nextLong(), 
random.nextDouble()));
+        }
+
+        testImpl(schema, input);
+    }
+
+    private void testImpl(TableSchema schema, List<RowData> input) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        RowDataKeyAndBucketExtractor extractor = new 
RowDataKeyAndBucketExtractor(schema);
+
+        int numChannels = random.nextInt(10) + 1;
+        boolean hasLogSink = random.nextBoolean();
+        RowDataChannelComputer channelComputer = new 
RowDataChannelComputer(schema, hasLogSink);
+        channelComputer.setup(numChannels);
+
+        // assert that channel(record) and channel(partition, bucket) gives 
the same result
+
+        for (RowData rowData : input) {
+            extractor.setRecord(rowData);
+            BinaryRow partition = extractor.partition();
+            int bucket = extractor.bucket();
+
+            assertThat(channelComputer.channel(rowData))
+                    .isEqualTo(channelComputer.channel(partition, bucket));
+        }
+
+        // assert that distribution should be even
+
+        int numTests = random.nextInt(10) + 1;
+        for (int test = 0; test < numTests; test++) {
+            Map<Integer, Integer> bucketsPerChannel = new HashMap<>();
+            for (int i = 0; i < numChannels; i++) {
+                bucketsPerChannel.put(i, 0);
+            }
+
+            extractor.setRecord(input.get(random.nextInt(input.size())));
+            BinaryRow partition = extractor.partition();
+
+            int numBuckets = random.nextInt(numChannels * 4) + 1;
+            for (int i = 0; i < numBuckets; i++) {
+                int channel = channelComputer.channel(partition, i);
+                bucketsPerChannel.compute(channel, (k, v) -> v + 1);
+            }
+
+            int max = 
bucketsPerChannel.values().stream().max(Integer::compareTo).get();
+            int min = 
bucketsPerChannel.values().stream().min(Integer::compareTo).get();
+            assertThat(max - min).isLessThanOrEqualTo(1);
+        }
+
+        // log sinks like Kafka only consider bucket and don't care about 
partition
+        // so same bucket, even from different partition, must go to the same 
channel
+
+        if (hasLogSink) {
+            Map<Integer, Set<Integer>> channelsPerBucket = new HashMap<>();
+            for (RowData rowData : input) {
+                extractor.setRecord(rowData);
+                int bucket = extractor.bucket();
+                channelsPerBucket
+                        .computeIfAbsent(bucket, k -> new HashSet<>())
+                        .add(channelComputer.channel(rowData));
+            }
+            for (Set<Integer> channels : channelsPerBucket.values()) {
+                assertThat(channels).hasSize(1);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index c9135ec2c..4748fc1d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -43,16 +43,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link FileStoreSink} when writing file store and with 
savepoints. */
 public class SinkSavepointITCase extends AbstractTestBase {
@@ -65,15 +64,21 @@ public class SinkSavepointITCase extends AbstractTestBase {
         path = getTempDirPath();
         // for failure tests
         failingName = UUID.randomUUID().toString();
-        FailingFileIO.reset(failingName, 100, 500);
     }
 
     @Test
-    @Timeout(180000)
+    @Timeout(180)
     public void testRecoverFromSavepoint() throws Exception {
         String failingPath = FailingFileIO.getFailingPath(failingName, path);
         String savepointPath = null;
+
         ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean enableFailure = random.nextBoolean();
+        if (enableFailure) {
+            FailingFileIO.reset(failingName, 100, 500);
+        } else {
+            FailingFileIO.reset(failingName, 0, 1);
+        }
 
         OUTER:
         while (true) {
@@ -117,7 +122,8 @@ public class SinkSavepointITCase extends AbstractTestBase {
             // recover from savepoint in the next round
         }
 
-        checkRecoverFromSavepointResult(failingPath);
+        checkRecoverFromSavepointBatchResult();
+        checkRecoverFromSavepointStreamingResult();
     }
 
     private JobClient runRecoverFromSavepointJob(String failingPath, String 
savepointPath)
@@ -139,20 +145,25 @@ public class SinkSavepointITCase extends AbstractTestBase 
{
         tEnv.getConfig()
                 .getConfiguration()
                 .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
path + "/checkpoint");
+        // input data must be strictly ordered for us to check changelog 
results
         tEnv.getConfig()
                 .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
-        // we're creating multiple table environments in the same process
-        // if we do not set this option, stream node id will be different even 
with the same SQL
-        // if stream node id is different then we can't recover from savepoint
-        tEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        String createCatalogSql =
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_catalog WITH (",
+                        "  'type' = 'paimon',",
+                        "  'warehouse' = '" + failingPath + "'",
+                        ")");
+        FailingFileIO.retryArtificialException(() -> 
tEnv.executeSql(createCatalogSql));
+        tEnv.executeSql("USE CATALOG my_catalog");
 
         tEnv.executeSql(
                 String.join(
                         "\n",
-                        "CREATE TABLE S (",
+                        "CREATE TEMPORARY TABLE S (",
                         "  a INT",
                         ") WITH (",
                         "  'connector' = 'datagen',",
@@ -162,29 +173,26 @@ public class SinkSavepointITCase extends AbstractTestBase 
{
                         "  'fields.a.end' = '99999'",
                         ")"));
 
-        String createCatalogSql =
-                String.join(
-                        "\n",
-                        "CREATE CATALOG my_catalog WITH (",
-                        "  'type' = 'paimon',",
-                        "  'warehouse' = '" + failingPath + "'",
-                        ")");
-        FailingFileIO.retryArtificialException(() -> 
tEnv.executeSql(createCatalogSql));
-
-        tEnv.executeSql("USE CATALOG my_catalog");
-
         String createSinkSql =
                 String.join(
                         "\n",
                         "CREATE TABLE IF NOT EXISTS T (",
-                        "  a INT",
+                        "  k INT,",
+                        "  v INT,",
+                        "  PRIMARY KEY (k) NOT ENFORCED",
                         ") WITH (",
-                        "  'bucket' = '2',",
-                        "  'file.format' = 'avro'",
+                        "  'bucket' = '4',",
+                        "  'file.format' = 'avro',",
+                        "  'changelog-producer' = 'full-compaction',",
+                        "  'full-compaction.delta-commits' = '3'",
                         ")");
         FailingFileIO.retryArtificialException(() -> 
tEnv.executeSql(createSinkSql));
 
-        String insertIntoSql = "INSERT INTO T SELECT * FROM 
default_catalog.default_database.S";
+        // test changing sink parallelism by using a random parallelism
+        String insertIntoSql =
+                String.format(
+                        "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '%d') 
*/ SELECT (a %% 15000) AS k, a AS v FROM S",
+                        ThreadLocalRandom.current().nextInt(3) + 2);
         JobClient jobClient =
                 FailingFileIO.retryArtificialException(() -> 
tEnv.executeSql(insertIntoSql))
                         .getJobClient()
@@ -196,32 +204,89 @@ public class SinkSavepointITCase extends AbstractTestBase 
{
         return jobClient;
     }
 
-    private void checkRecoverFromSavepointResult(String failingPath) throws 
Exception {
+    private void checkRecoverFromSavepointBatchResult() throws Exception {
         EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
         TableEnvironment tEnv = TableEnvironment.create(settings);
-        // no failure should occur when checking for answer
-        FailingFileIO.reset(failingName, 0, 1);
 
-        String createCatalogSql =
+        tEnv.executeSql(
                 String.join(
                         "\n",
                         "CREATE CATALOG my_catalog WITH (",
                         "  'type' = 'paimon',",
-                        "  'warehouse' = '" + failingPath + "'",
-                        ")");
-        tEnv.executeSql(createCatalogSql);
-
+                        "  'warehouse' = '" + path + "'",
+                        ")"));
         tEnv.executeSql("USE CATALOG my_catalog");
 
-        List<Integer> actual = new ArrayList<>();
+        Map<Integer, Integer> expected = new HashMap<>();
+        for (int i = 0; i < 100000; i++) {
+            expected.put(i % 15000, i);
+        }
+
+        Map<Integer, Integer> actual = new HashMap<>();
         try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM 
T").collect()) {
             while (it.hasNext()) {
                 Row row = it.next();
-                assertEquals(1, row.getArity());
-                actual.add((Integer) row.getField(0));
+                assertThat(row.getArity()).isEqualTo(2);
+                actual.put((Integer) row.getField(0), (Integer) 
row.getField(1));
+            }
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private void checkRecoverFromSavepointStreamingResult() throws Exception {
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_catalog WITH (",
+                        "  'type' = 'paimon',",
+                        "  'warehouse' = '" + path + "'",
+                        ")"));
+        tEnv.executeSql("USE CATALOG my_catalog");
+
+        Map<Integer, Integer> expected = new HashMap<>();
+        for (int i = 0; i < 100000; i++) {
+            expected.put(i % 15000, i);
+        }
+        Set<Integer> expectedValues = new HashSet<>(expected.values());
+
+        int endCount = 0;
+        Map<Integer, Integer> actual = new HashMap<>();
+        try (CloseableIterator<Row> it =
+                tEnv.executeSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id' = '2') */")
+                        .collect()) {
+            while (it.hasNext()) {
+                Row row = it.next();
+                assertThat(row.getArity()).isEqualTo(2);
+
+                int k = (Integer) row.getField(0);
+                int v = (Integer) row.getField(1);
+                switch (row.getKind()) {
+                    case INSERT:
+                    case UPDATE_AFTER:
+                        assertThat(actual).doesNotContainKey(k);
+                        actual.put(k, v);
+                        break;
+                    case DELETE:
+                    case UPDATE_BEFORE:
+                        assertThat(actual).containsKey(k);
+                        actual.remove(k);
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown row kind " + row.getKind());
+                }
+
+                if (expectedValues.contains(v)) {
+                    endCount++;
+                }
+                if (endCount >= expectedValues.size()) {
+                    break;
+                }
             }
         }
-        Collections.sort(actual);
-        assertEquals(IntStream.range(0, 
100000).boxed().collect(Collectors.toList()), actual);
+        assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
new file mode 100644
index 000000000..ccfc4adad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CdcRecordChannelComputer}. */
+public class CdcRecordChannelComputerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testSchemaWithPartition() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"pt", "k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                new HashMap<>(),
+                                ""));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numInputs = random.nextInt(1000) + 1;
+        List<Map<String, String>> input = new ArrayList<>();
+        for (int i = 0; i < numInputs; i++) {
+            Map<String, String> fields = new HashMap<>();
+            fields.put("pt", String.valueOf(random.nextInt(10) + 1));
+            fields.put("k", String.valueOf(random.nextLong()));
+            fields.put("v", String.valueOf(random.nextDouble()));
+            input.add(fields);
+        }
+
+        testImpl(schema, input);
+    }
+
+    @Test
+    public void testSchemaNoPartition() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                new HashMap<>(),
+                                ""));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numInputs = random.nextInt(1000) + 1;
+        List<Map<String, String>> input = new ArrayList<>();
+        for (int i = 0; i < numInputs; i++) {
+            Map<String, String> fields = new HashMap<>();
+            fields.put("k", String.valueOf(random.nextLong()));
+            fields.put("v", String.valueOf(random.nextDouble()));
+            input.add(fields);
+        }
+
+        testImpl(schema, input);
+    }
+
+    private void testImpl(TableSchema schema, List<Map<String, String>> input) 
{
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        CdcRecordKeyAndBucketExtractor extractor = new 
CdcRecordKeyAndBucketExtractor(schema);
+
+        int numChannels = random.nextInt(10) + 1;
+        CdcRecordChannelComputer channelComputer = new 
CdcRecordChannelComputer(schema);
+        channelComputer.setup(numChannels);
+
+        // assert that channel(record) and channel(partition, bucket) gives 
the same result
+
+        for (Map<String, String> fields : input) {
+            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
+            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+
+            extractor.setRecord(random.nextBoolean() ? insertRecord : 
deleteRecord);
+            BinaryRow partition = extractor.partition();
+            int bucket = extractor.bucket();
+
+            assertThat(channelComputer.channel(insertRecord))
+                    .isEqualTo(channelComputer.channel(partition, bucket));
+            assertThat(channelComputer.channel(deleteRecord))
+                    .isEqualTo(channelComputer.channel(partition, bucket));
+        }
+
+        // assert that distribution should be even
+
+        int numTests = random.nextInt(10) + 1;
+        for (int test = 0; test < numTests; test++) {
+            Map<Integer, Integer> bucketsPerChannel = new HashMap<>();
+            for (int i = 0; i < numChannels; i++) {
+                bucketsPerChannel.put(i, 0);
+            }
+
+            Map<String, String> fields = 
input.get(random.nextInt(input.size()));
+            extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
+            BinaryRow partition = extractor.partition();
+
+            int numBuckets = random.nextInt(numChannels * 4) + 1;
+            for (int i = 0; i < numBuckets; i++) {
+                int channel = channelComputer.channel(partition, i);
+                bucketsPerChannel.compute(channel, (k, v) -> v + 1);
+            }
+
+            int max = 
bucketsPerChannel.values().stream().max(Integer::compareTo).get();
+            int min = 
bucketsPerChannel.values().stream().min(Integer::compareTo).get();
+            assertThat(max - min).isLessThanOrEqualTo(1);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 38ec56338..5ed9f70a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -62,8 +62,8 @@ import java.util.function.Predicate;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link SchemaAwareStoreWriteOperator}. */
-public class SchemaAwareStoreWriteOperatorTest {
+/** Tests for {@link CdcRecordStoreWriteOperator}. */
+public class CdcRecordStoreWriteOperatorTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -252,13 +252,13 @@ public class SchemaAwareStoreWriteOperatorTest {
 
     private OneInputStreamOperatorTestHarness<CdcRecord, Committable> 
createTestHarness(
             FileStoreTable table) throws Exception {
-        SchemaAwareStoreWriteOperator operator =
-                new SchemaAwareStoreWriteOperator(
+        CdcRecordStoreWriteOperator operator =
+                new CdcRecordStoreWriteOperator(
                         table,
-                        null,
-                        (t, context, ioManager) ->
+                        (t, commitUser, state, ioManager) ->
                                 new StoreSinkWriteImpl(
-                                        t, context, commitUser, ioManager, 
false, false));
+                                        t, commitUser, state, ioManager, 
false, false),
+                        commitUser);
         TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
         TypeSerializer<Committable> outputSerializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
@@ -271,7 +271,7 @@ public class SchemaAwareStoreWriteOperatorTest {
     private FileStoreTable createFileStoreTable(
             RowType rowType, List<String> partitions, List<String> 
primaryKeys) throws Exception {
         Options conf = new Options();
-        conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME, 
Duration.ofMillis(10));
+        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, 
Duration.ofMillis(10));
 
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index bc600a86f..eaac7448b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -226,7 +225,6 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
         conf.set(CoreOptions.BUCKET, numBucket);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
-        conf.set(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION, 
shuffleByPartitionEnable);
 
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(

Reply via email to