This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e8318a2c [flink] Introduce Dynamic bucket sink to Flink (#1357)
3e8318a2c is described below

commit 3e8318a2c6da4521e4354315b1e8ff6ac5413585
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 13 10:00:38 2023 +0800

    [flink] Introduce Dynamic bucket sink to Flink (#1357)
---
 docs/content/concepts/primary-key-table.md         | 11 +++
 docs/content/how-to/cdc-ingestion.md               |  2 +-
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 +++
 .../flink/sink/DynamicBucketRowWriteOperator.java  | 52 ++++++++++++
 .../paimon/flink/sink/DynamicBucketSink.java       | 86 ++++++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 39 ++++-----
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  8 ++
 .../flink/sink/HashBucketAssignerOperator.java     | 92 ++++++++++++++++++++++
 .../paimon/flink/sink/RowDynamicBucketSink.java    | 68 ++++++++++++++++
 .../paimon/flink/sink/TableWriteOperator.java      | 25 +++---
 .../flink/sink/cdc/CdcDynamicBucketSink.java       | 80 +++++++++++++++++++
 ...tor.java => CdcDynamicBucketWriteOperator.java} | 28 +++----
 .../flink/sink/cdc/CdcHashKeyChannelComputer.java  | 55 +++++++++++++
 .../sink/cdc/CdcRecordKeyAndBucketExtractor.java   | 15 +++-
 .../sink/cdc/CdcRecordStoreWriteOperator.java      |  2 +-
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      | 33 ++++----
 .../sink/cdc/CdcWithBucketChannelComputer.java     | 56 +++++++++++++
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 48 +++++------
 .../paimon/flink/DynamicBucketTableITCase.java     | 68 ++++++++++++++++
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   | 15 +++-
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      | 12 ++-
 22 files changed, 719 insertions(+), 94 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index c1cd5f920..42bd87936 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -32,6 +32,17 @@ Primary keys consist of a set of columns that contain unique 
values for each rec
 
 By [defining primary keys]({{< ref 
"how-to/creating-tables#tables-with-primary-keys" >}}) on a changelog table, 
users can access the following features.
 
+## Bucket
+
+A bucket is the smallest storage unit for reads and writes, each bucket 
directory contains an [LSM tree]({{< ref "concepts/file-layouts#lsm-trees" >}}).
+
+Primary Key Table supports two bucket mode:
+1. Fixed Bucket mode: configure a bucket greater than 0, rescaling buckets can 
only be done through offline processes, 
+   see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too 
large number of buckets leads to too many
+   small files, and a too small number of buckets leads to poor write 
performance.
+2. Dynamic Bucket mode: configure `'bucket' = '-1'`, Paimon dynamically 
maintains the index, keeping the data volume
+   in the bucket below `'dynamic-bucket.target-row-num'`. (This is an 
experimental feature)
+
 ## Merge Engines
 
 When Paimon sink receives two or more records with the same primary keys, it 
will merge them into one record to keep primary keys unique. By specifying the 
`merge-engine` table property, users can choose how records are merged together.
diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index acd5edc1a..9f1039c76 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -176,7 +176,7 @@ The command to recover from previous snapshot and add new 
tables to synchronize
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    --fromSavepoint {{< savepointPath >}} \
+    --fromSavepoint savepointPath \
     /path/to/paimon-flink-action-{{< version >}}.jar \
     mysql-sync-database \
     --warehouse hdfs:///path/to/warehouse \
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 872a8da15..75debc1ac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@ under the License.
             <td>Duration</td>
             <td>The discovery interval of continuous reading.</td>
         </tr>
+        <tr>
+            <td><h5>dynamic-bucket.target-row-num</h5></td>
+            <td style="word-wrap: break-word;">2000000</td>
+            <td>Long</td>
+            <td>If the bucket is -1, for primary key table, is dynamic bucket 
mode, this option controls the target row number for one bucket.</td>
+        </tr>
         <tr>
             <td><h5>dynamic-partition-overwrite</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index d38150de0..0d195ac39 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -648,6 +648,14 @@ public class CoreOptions implements Serializable {
                             "The expiration interval of consumer files. A 
consumer file will be expired if "
                                     + "it's lifetime after last modification 
is over this value.");
 
+    public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
+            key("dynamic-bucket.target-row-num")
+                    .longType()
+                    .defaultValue(2_000_000L)
+                    .withDescription(
+                            "If the bucket is -1, for primary key table, is 
dynamic bucket mode, "
+                                    + "this option controls the target row 
number for one bucket.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -837,6 +845,10 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
     }
 
+    public long dynamicBucketTargetRowNum() {
+        return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
+    }
+
     public ChangelogProducer changelogProducer() {
         return options.get(CHANGELOG_PRODUCER);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
new file mode 100644
index 000000000..7db9c2802
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/** A {@link PrepareCommitOperator} to write {@link RowData} with bucket. 
Record schema is fixed. */
+public class DynamicBucketRowWriteOperator extends 
TableWriteOperator<Tuple2<RowData, Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    public DynamicBucketRowWriteOperator(
+            FileStoreTable table,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser) {
+        super(table, storeSinkWriteProvider, initialCommitUser);
+    }
+
+    @Override
+    protected boolean containLogSystem() {
+        return false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<Tuple2<RowData, Integer>> element) 
throws Exception {
+        FlinkRowWrapper internalRow = new 
FlinkRowWrapper(element.getValue().f0);
+        DynamicBucketRow row = new DynamicBucketRow(internalRow, 
element.getValue().f1);
+        write.write(row);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
new file mode 100644
index 000000000..e4f804832
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.paimon.flink.sink.FlinkStreamPartitioner.createPartitionTransformation;
+
+/** Sink for dynamic bucket table. */
+public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, 
Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    public DynamicBucketSink(
+            FileStoreTable table,
+            Lock.Factory lockFactory,
+            @Nullable Map<String, String> overwritePartition) {
+        super(table, overwritePartition, lockFactory);
+    }
+
+    protected abstract ChannelComputer<T> channelComputer1();
+
+    protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();
+
+    protected abstract SerializableFunction<TableSchema, 
PartitionKeyExtractor<T>>
+            extractorFunction();
+
+    public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer 
parallelism) {
+        String initialCommitUser = UUID.randomUUID().toString();
+
+        // Topology:
+        // input -- shuffle by key hash --> bucket-assigner -- shuffle by 
bucket --> writer -->
+        // committer
+
+        // 1. shuffle by key hash
+        DataStream<T> partitionByKeyHash =
+                createPartitionTransformation(input, channelComputer1(), 
parallelism);
+
+        // 2. bucket-assigner
+        HashBucketAssignerOperator<T> assignerOperator =
+                new HashBucketAssignerOperator<>(initialCommitUser, table, 
extractorFunction());
+        TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
+                new TupleTypeInfo<>(partitionByKeyHash.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
+        DataStream<Tuple2<T, Integer>> bucketAssigned =
+                partitionByKeyHash.transform(
+                        "dynamic-bucket-assigner", rowWithBucketType, 
assignerOperator);
+
+        // 3. shuffle by bucket
+        DataStream<Tuple2<T, Integer>> partitionByBucket =
+                createPartitionTransformation(bucketAssigned, 
channelComputer2(), parallelism);
+
+        // 4. writer and committer
+        return sinkFrom(partitionByBucket, initialCommitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 6ab2ddd4c..a88523219 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -125,7 +125,16 @@ public abstract class FlinkSink<T> implements Serializable 
{
         return sinkFrom(input, initialCommitUser);
     }
 
-    public DataStreamSink<?> sinkFrom(DataStream<T> input, String commitUser) {
+    public DataStreamSink<?> sinkFrom(DataStream<T> input, String 
initialCommitUser) {
+
+        return sinkFrom(
+                input,
+                initialCommitUser,
+                
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
+    }
+
+    public DataStreamSink<?> sinkFrom(
+            DataStream<T> input, String commitUser, StoreSinkWrite.Provider 
sinkProvider) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -138,24 +147,12 @@ public abstract class FlinkSink<T> implements 
Serializable {
             assertCheckpointConfiguration(env);
         }
 
-        SingleOutputStreamOperator<Committable> written =
-                handleInput(input, isStreaming, commitUser);
-
-        return commit(written, streamingCheckpointEnabled, commitUser);
-    }
-
-    protected SingleOutputStreamOperator<Committable> handleInput(
-            DataStream<T> input, boolean isStreaming, String commitUser) {
+        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
         SingleOutputStreamOperator<Committable> written =
                 input.transform(
                                 WRITER_NAME + " -> " + table.name(),
-                                new CommittableTypeInfo(),
-                                createWriteOperator(
-                                        createWriteProvider(
-                                                input.getExecutionEnvironment()
-                                                        
.getCheckpointConfig()),
-                                        isStreaming,
-                                        commitUser))
+                                typeInfo,
+                                createWriteOperator(sinkProvider, isStreaming, 
commitUser))
                         .setParallelism(input.getParallelism());
         Options options = Options.fromMap(table.options());
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
@@ -164,17 +161,11 @@ public abstract class FlinkSink<T> implements 
Serializable {
                     .declareManagedMemoryUseCaseAtOperatorScope(
                             ManagedMemoryUseCase.OPERATOR, 
memorySize.getMebiBytes());
         }
-        return written;
-    }
 
-    protected DataStreamSink<?> commit(
-            SingleOutputStreamOperator<Committable> written,
-            boolean streamingCheckpointEnabled,
-            String commitUser) {
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME + " -> " + table.name(),
-                                new CommittableTypeInfo(),
+                                typeInfo,
                                 new CommitterOperator<>(
                                         streamingCheckpointEnabled,
                                         commitUser,
@@ -185,7 +176,7 @@ public abstract class FlinkSink<T> implements Serializable {
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    public static void 
assertCheckpointConfiguration(StreamExecutionEnvironment env) {
+    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) 
{
         Preconditions.checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "Paimon sink currently does not support unaligned checkpoints. 
Please set "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 84f031614..1aa48d805 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import java.util.Map;
 
 import static 
org.apache.paimon.flink.sink.FlinkStreamPartitioner.createPartitionTransformation;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Builder for {@link FileStoreSink}. */
 public class FlinkSinkBuilder {
@@ -78,12 +79,19 @@ public class FlinkSinkBuilder {
             case FIXED:
                 return buildForFixedBucket();
             case DYNAMIC:
+                return buildDynamicBucketSink();
             case UNAWARE:
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
         }
     }
 
+    private DataStreamSink<?> buildDynamicBucketSink() {
+        checkArgument(logSinkFunction == null, "Dynamic bucket mode can not 
work with log system.");
+        return new RowDynamicBucketSink(table, lockFactory, overwritePartition)
+                .build(input, parallelism);
+    }
+
     private DataStreamSink<?> buildForFixedBucket() {
         DataStream<RowData> partitioned =
                 createPartitionTransformation(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
new file mode 100644
index 000000000..2d256d9bf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Assign bucket for the input record, output record with bucket. */
+public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2<T, Integer>>
+        implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String initialCommitUser;
+
+    private final AbstractFileStoreTable table;
+    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
+
+    private transient HashBucketAssigner assigner;
+    private transient PartitionKeyExtractor<T> extractor;
+
+    public HashBucketAssignerOperator(
+            String commitUser,
+            Table table,
+            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction) {
+        this.initialCommitUser = commitUser;
+        this.table = (AbstractFileStoreTable) table;
+        this.extractorFunction = extractorFunction;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Each job can only have one user name and this name must be 
consistent across restarts.
+        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        String commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, 
initialCommitUser);
+
+        this.assigner =
+                new HashBucketAssigner(
+                        table.snapshotManager(),
+                        commitUser,
+                        table.store().newIndexFileHandler(),
+                        getRuntimeContext().getNumberOfParallelSubtasks(),
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        table.coreOptions().dynamicBucketTargetRowNum());
+        this.extractor = extractorFunction.apply(table.schema());
+    }
+
+    @Override
+    public void processElement(StreamRecord<T> streamRecord) throws Exception {
+        T value = streamRecord.getValue();
+        int bucket =
+                assigner.assign(
+                        extractor.partition(value), 
extractor.trimmedPrimaryKey(value).hashCode());
+        output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) {
+        assigner.prepareCommit(checkpointId);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
new file mode 100644
index 000000000..b6678a5df
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Sink for dynamic bucket table. */
+public class RowDynamicBucketSink extends DynamicBucketSink<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    public RowDynamicBucketSink(
+            FileStoreTable table,
+            Lock.Factory lockFactory,
+            @Nullable Map<String, String> overwritePartition) {
+        super(table, lockFactory, overwritePartition);
+    }
+
+    @Override
+    protected ChannelComputer<RowData> channelComputer1() {
+        return new RowHashKeyChannelComputer(table.schema());
+    }
+
+    @Override
+    protected ChannelComputer<Tuple2<RowData, Integer>> channelComputer2() {
+        return new RowWithBucketChannelComputer(table.schema());
+    }
+
+    @Override
+    protected SerializableFunction<TableSchema, PartitionKeyExtractor<RowData>>
+            extractorFunction() {
+        return RowDataPartitionKeyExtractor::new;
+    }
+
+    @Override
+    protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new DynamicBucketRowWriteOperator(table, writeProvider, 
commitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 1c7f53a5e..b7ed60f68 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
@@ -59,15 +60,17 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
-        RowDataChannelComputer channelComputer =
-                new RowDataChannelComputer(table.schema(), containLogSystem());
-        
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
-        state =
-                new StoreSinkWriteState(
-                        context,
-                        (tableName, partition, bucket) ->
-                                channelComputer.channel(partition, bucket)
-                                        == 
getRuntimeContext().getIndexOfThisSubtask());
+        boolean containLogSystem = containLogSystem();
+        int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+        StateValueFilter stateFilter =
+                (tableName, partition, bucket) -> {
+                    int task =
+                            containLogSystem
+                                    ? ChannelComputer.select(bucket, numTasks)
+                                    : ChannelComputer.select(partition, 
bucket, numTasks);
+                    return task == getRuntimeContext().getIndexOfThisSubtask();
+                };
+        state = new StoreSinkWriteState(context, stateFilter);
 
         write =
                 storeSinkWriteProvider.provide(
@@ -91,7 +94,9 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     @Override
     public void close() throws Exception {
         super.close();
-        write.close();
+        if (write != null) {
+            write.close();
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
new file mode 100644
index 000000000..02cd8c105
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.DynamicBucketSink;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/** Sink for dynamic bucket table. */
+public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    public CdcDynamicBucketSink(FileStoreTable table, Lock.Factory 
lockFactory) {
+        super(table, lockFactory, null);
+    }
+
+    @Override
+    protected ChannelComputer<CdcRecord> channelComputer1() {
+        return new CdcHashKeyChannelComputer(table.schema());
+    }
+
+    @Override
+    protected ChannelComputer<Tuple2<CdcRecord, Integer>> channelComputer2() {
+        return new CdcWithBucketChannelComputer(table.schema());
+    }
+
+    @Override
+    protected SerializableFunction<TableSchema, 
PartitionKeyExtractor<CdcRecord>>
+            extractorFunction() {
+        return schema -> {
+            CdcRecordKeyAndBucketExtractor extractor = new 
CdcRecordKeyAndBucketExtractor(schema);
+            return new PartitionKeyExtractor<CdcRecord>() {
+                @Override
+                public BinaryRow partition(CdcRecord record) {
+                    extractor.setRecord(record);
+                    return extractor.partition();
+                }
+
+                @Override
+                public BinaryRow trimmedPrimaryKey(CdcRecord record) {
+                    extractor.setRecord(record);
+                    return extractor.trimmedPrimaryKey();
+                }
+            };
+        };
+    }
+
+    @Override
+    protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new CdcDynamicBucketWriteOperator(table, writeProvider, 
commitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
similarity index 73%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index 04ae9ea38..64fb65910 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -22,35 +22,29 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.sink.PrepareCommitOperator;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.TableWriteOperator;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.DynamicBucketRow;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
 import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
 
 /**
- * A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema 
may change. If current
- * known schema does not fit record schema, this operator will wait for schema 
changes.
+ * A {@link PrepareCommitOperator} to write {@link CdcRecord} with bucket. 
Record schema is fixed.
  */
-public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> 
{
+public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<CdcRecord, Integer>> {
 
     private static final long serialVersionUID = 1L;
 
-    static final ConfigOption<Duration> RETRY_SLEEP_TIME =
-            ConfigOptions.key("cdc.retry-sleep-time")
-                    .durationType()
-                    .defaultValue(Duration.ofMillis(500));
-
     private final long retrySleepMillis;
 
-    public CdcRecordStoreWriteOperator(
+    public CdcDynamicBucketWriteOperator(
             FileStoreTable table,
             StoreSinkWrite.Provider storeSinkWriteProvider,
             String initialCommitUser) {
@@ -71,13 +65,13 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
     }
 
     @Override
-    public void processElement(StreamRecord<CdcRecord> element) throws 
Exception {
-        CdcRecord record = element.getValue();
-        Optional<GenericRow> optionalConverted = toGenericRow(record, 
table.schema().fields());
+    public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> 
element) throws Exception {
+        Tuple2<CdcRecord, Integer> record = element.getValue();
+        Optional<GenericRow> optionalConverted = toGenericRow(record.f0, 
table.schema().fields());
         if (!optionalConverted.isPresent()) {
             while (true) {
                 table = table.copyWithLatestSchema();
-                optionalConverted = toGenericRow(record, 
table.schema().fields());
+                optionalConverted = toGenericRow(record.f0, 
table.schema().fields());
                 if (optionalConverted.isPresent()) {
                     break;
                 }
@@ -87,7 +81,7 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
         }
 
         try {
-            write.write(optionalConverted.get());
+            write.write(new DynamicBucketRow(optionalConverted.get(), 
record.f1));
         } catch (Exception e) {
             throw new IOException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
new file mode 100644
index 000000000..918233749
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+
+/** Hash key of a {@link CdcRecord}. */
+public class CdcHashKeyChannelComputer implements ChannelComputer<CdcRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+
+    private transient int numChannels;
+    private transient CdcRecordKeyAndBucketExtractor extractor;
+
+    public CdcHashKeyChannelComputer(TableSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+    }
+
+    @Override
+    public int channel(CdcRecord record) {
+        extractor.setRecord(record);
+        int hash = extractor.trimmedPrimaryKey().hashCode();
+        return Math.abs(hash % numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by key hash";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index 9bd00f966..f3ab1654e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -41,10 +41,13 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
     private final Projection partitionProjection;
     private final List<DataField> bucketKeyFields;
     private final Projection bucketKeyProjection;
+    private final List<DataField> trimmedPKFields;
+    private final Projection trimmedPKProjection;
 
     private CdcRecord record;
 
     private BinaryRow partition;
+    private BinaryRow trimmedPK;
     private BinaryRow bucketKey;
     private Integer bucket;
 
@@ -62,6 +65,12 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
         this.bucketKeyProjection =
                 CodeGenUtils.newProjection(
                         bucketKeyType, IntStream.range(0, 
bucketKeyType.getFieldCount()).toArray());
+
+        this.trimmedPKFields = schema.trimmedPrimaryKeysFields();
+        this.trimmedPKProjection =
+                CodeGenUtils.newProjection(
+                        new RowType(trimmedPKFields),
+                        IntStream.range(0, trimmedPKFields.size()).toArray());
     }
 
     @Override
@@ -70,6 +79,7 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
 
         this.partition = null;
         this.bucketKey = null;
+        this.trimmedPK = null;
         this.bucket = null;
     }
 
@@ -96,7 +106,10 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
 
     @Override
     public BinaryRow trimmedPrimaryKey() {
-        throw new UnsupportedOperationException();
+        if (trimmedPK == null) {
+            trimmedPK = trimmedPKProjection.apply(projectAsInsert(record, 
trimmedPKFields));
+        }
+        return trimmedPK;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index 04ae9ea38..dd0aa2e56 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -43,7 +43,7 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
 
     private static final long serialVersionUID = 1L;
 
-    static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+    public static final ConfigOption<Duration> RETRY_SLEEP_TIME =
             ConfigOptions.key("cdc.retry-sleep-time")
                     .durationType()
                     .defaultValue(Duration.ofMillis(500));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 8a35101f7..06051664e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -88,20 +88,6 @@ public class CdcSinkBuilder<T> {
 
         FileStoreTable dataTable = (FileStoreTable) table;
 
-        BucketMode bucketMode = dataTable.bucketMode();
-        switch (bucketMode) {
-            case FIXED:
-                return buildForFixedBucket();
-            case DYNAMIC:
-            case UNAWARE:
-            default:
-                throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
-        }
-    }
-
-    private DataStreamSink<?> buildForFixedBucket() {
-        FileStoreTable dataTable = (FileStoreTable) table;
-
         SingleOutputStreamOperator<CdcRecord> parsed =
                 input.forward()
                         .process(new 
CdcParsingProcessFunction<>(parserFactory))
@@ -117,6 +103,25 @@ public class CdcSinkBuilder<T> {
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
         schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
+        BucketMode bucketMode = dataTable.bucketMode();
+        switch (bucketMode) {
+            case FIXED:
+                return buildForFixedBucket(parsed);
+            case DYNAMIC:
+                return buildForDynamicBucket(parsed);
+            case UNAWARE:
+            default:
+                throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
+        }
+    }
+
+    private DataStreamSink<?> buildForDynamicBucket(DataStream<CdcRecord> 
parsed) {
+        return new CdcDynamicBucketSink((FileStoreTable) table, lockFactory)
+                .build(parsed, parallelism);
+    }
+
+    private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> 
parsed) {
+        FileStoreTable dataTable = (FileStoreTable) table;
         FlinkStreamPartitioner<CdcRecord> partitioner =
                 new FlinkStreamPartitioner<>(new 
CdcRecordChannelComputer(dataTable.schema()));
         PartitionTransformation<CdcRecord> partitioned =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
new file mode 100644
index 000000000..a24e6e3a0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Hash key of a {@link CdcRecord} with bucket. */
+public class CdcWithBucketChannelComputer implements 
ChannelComputer<Tuple2<CdcRecord, Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+
+    private transient int numChannels;
+    private transient CdcRecordKeyAndBucketExtractor extractor;
+
+    public CdcWithBucketChannelComputer(TableSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+    }
+
+    @Override
+    public int channel(Tuple2<CdcRecord, Integer> record) {
+        extractor.setRecord(record.f0);
+        return ChannelComputer.select(extractor.partition(), record.f1, 
numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by partition & bucket";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index d01bb1696..dd3ca0ae7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 
 import javax.annotation.Nullable;
@@ -84,20 +83,37 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         Preconditions.checkNotNull(input);
         Preconditions.checkNotNull(parserFactory);
 
-        StreamExecutionEnvironment env = input.getExecutionEnvironment();
-
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(new 
CdcMultiTableParsingProcessFunction<>(parserFactory))
                         .setParallelism(input.getParallelism());
 
         for (FileStoreTable table : tables) {
+            DataStream<Void> schemaChangeProcessFunction =
+                    SingleOutputStreamOperatorUtils.getSideOutput(
+                                    parsed,
+                                    CdcMultiTableParsingProcessFunction
+                                            
.createUpdatedDataFieldsOutputTag(table.name()))
+                            .process(
+                                    new UpdatedDataFieldsProcessFunction(
+                                            new SchemaManager(table.fileIO(), 
table.location())));
+            schemaChangeProcessFunction.getTransformation().setParallelism(1);
+            
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+
+            DataStream<CdcRecord> parsedForTable =
+                    SingleOutputStreamOperatorUtils.getSideOutput(
+                            parsed,
+                            
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
+                                    table.name()));
+
             BucketMode bucketMode = table.bucketMode();
             switch (bucketMode) {
                 case FIXED:
-                    buildForFixedBucket(table, parsed);
+                    buildForFixedBucket(table, parsedForTable);
                     break;
                 case DYNAMIC:
+                    buildForDynamicBucket(table, parsedForTable);
+                    break;
                 case UNAWARE:
                 default:
                     throw new UnsupportedOperationException(
@@ -106,29 +122,15 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         }
     }
 
-    private void buildForFixedBucket(
-            FileStoreTable table, SingleOutputStreamOperator<Void> parsed) {
-        DataStream<Void> schemaChangeProcessFunction =
-                SingleOutputStreamOperatorUtils.getSideOutput(
-                                parsed,
-                                CdcMultiTableParsingProcessFunction
-                                        
.createUpdatedDataFieldsOutputTag(table.name()))
-                        .process(
-                                new UpdatedDataFieldsProcessFunction(
-                                        new SchemaManager(table.fileIO(), 
table.location())));
-        schemaChangeProcessFunction.getTransformation().setParallelism(1);
-        schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+    private void buildForDynamicBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
+        new CdcDynamicBucketSink(table, lockFactory).build(parsed, 
parallelism);
+    }
 
+    private void buildForFixedBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
         FlinkStreamPartitioner<CdcRecord> partitioner =
                 new FlinkStreamPartitioner<>(new 
CdcRecordChannelComputer(table.schema()));
         PartitionTransformation<CdcRecord> partitioned =
-                new PartitionTransformation<>(
-                        SingleOutputStreamOperatorUtils.getSideOutput(
-                                        parsed,
-                                        
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
-                                                table.name()))
-                                .getTransformation(),
-                        partitioner);
+                new PartitionTransformation<>(parsed.getTransformation(), 
partitioner);
         if (parallelism != null) {
             partitioned.setParallelism(parallelism);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
new file mode 100644
index 000000000..4b55537f2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for batch file store. */
+public class DynamicBucketTableITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T ("
+                        + "pt INT, "
+                        + "pk INT, "
+                        + "v INT, "
+                        + "PRIMARY KEY (pt, pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3' "
+                        + ")");
+    }
+
+    @Test
+    public void testWriteRead() {
+        sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 1),
+                        Row.of(1, 2, 2),
+                        Row.of(1, 3, 3),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+        sql("INSERT INTO T VALUES (1, 3, 33), (1, 1, 11)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 11),
+                        Row.of(1, 2, 2),
+                        Row.of(1, 3, 33),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+
+        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+                .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 1df02a2f1..d3cf58e0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -53,6 +53,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
 
 /** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
 public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
@@ -65,6 +66,16 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
     @Test
     @Timeout(120)
     public void testRandomCdcEvents() throws Exception {
+        innerTestRandomCdcEvents(() -> ThreadLocalRandom.current().nextInt(5) 
+ 1);
+    }
+
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEventsDynamicBucket() throws Exception {
+        innerTestRandomCdcEvents(() -> -1);
+    }
+
+    private void innerTestRandomCdcEvents(Supplier<Integer> bucket) throws 
Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
         int numTables = random.nextInt(3) + 1;
@@ -74,7 +85,6 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
         int maxSchemaChanges = 10;
         int maxPartitions = 3;
         int maxKeys = 150;
-        int maxBuckets = 5;
 
         String failingName = UUID.randomUUID().toString();
 
@@ -121,7 +131,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
                             testTable.initialRowType(),
                             Collections.singletonList("pt"),
                             Arrays.asList("pt", "k"),
-                            random.nextInt(maxBuckets) + 1);
+                            bucket.get());
             fileStoreTables.add(fileStoreTable);
         }
 
@@ -177,6 +187,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
             throws Exception {
         Options conf = new Options();
         conf.set(CoreOptions.BUCKET, numBucket);
+        conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 100L);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index fac8cb761..e47cb03d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -60,13 +60,22 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
     @Test
     @Timeout(120)
     public void testRandomCdcEvents() throws Exception {
+        innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1);
+    }
+
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEventsDynamicBucket() throws Exception {
+        innerTestRandomCdcEvents(-1);
+    }
+
+    private void innerTestRandomCdcEvents(int numBucket) throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
         int numEvents = random.nextInt(1500) + 1;
         int numSchemaChanges = Math.min(numEvents / 2, random.nextInt(10) + 1);
         int numPartitions = random.nextInt(3) + 1;
         int numKeys = random.nextInt(150) + 1;
-        int numBucket = random.nextInt(5) + 1;
         boolean enableFailure = random.nextBoolean();
 
         TestTable testTable =
@@ -141,6 +150,7 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
             throws Exception {
         Options conf = new Options();
         conf.set(CoreOptions.BUCKET, numBucket);
+        conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 100L);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
 

Reply via email to