This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 64f996b50 [FLINK-35891][pipeline-connector][paimon] Support dynamic 
bucket in Paimon sink
64f996b50 is described below

commit 64f996b504fe9b5b250e1806c9b98de8df8bd01d
Author: Kunni <[email protected]>
AuthorDate: Mon Aug 12 14:46:01 2024 +0800

    [FLINK-35891][pipeline-connector][paimon] Support dynamic bucket in Paimon 
sink
    
    This closes  #3499.
---
 .../org/apache/flink/cdc/common/sink/DataSink.java |   6 +
 .../cdc/composer/flink/FlinkPipelineComposer.java  |   2 +-
 .../cdc/connectors/paimon/sink/PaimonDataSink.java |  24 ++-
 .../paimon/sink/PaimonDataSinkFactory.java         |  12 +-
 .../connectors/paimon/sink/PaimonHashFunction.java |  70 +++++++
 .../paimon/sink/PaimonHashFunctionProvider.java    |  51 +++++
 .../paimon/sink/v2/OperatorIDGenerator.java        |  57 ++++++
 .../cdc/connectors/paimon/sink/v2/PaimonEvent.java |  17 ++
 .../connectors/paimon/sink/v2/PaimonEventSink.java |  74 +++++++
 .../sink/v2/PaimonRecordEventSerializer.java       |  13 +-
 .../cdc/connectors/paimon/sink/v2/PaimonSink.java  |   4 +-
 .../connectors/paimon/sink/v2/PaimonWriter.java    |  28 +--
 .../paimon/sink/v2/StoreSinkWriteImpl.java         |   2 +-
 .../sink/v2/bucket/BucketAssignOperator.java       | 226 +++++++++++++++++++++
 .../paimon/sink/v2/bucket/BucketWrapper.java       |  28 +++
 .../sink/v2/bucket/BucketWrapperChangeEvent.java   |  77 +++++++
 .../v2/bucket/BucketWrapperEventSerializer.java    | 128 ++++++++++++
 .../sink/v2/bucket/BucketWrapperEventTypeInfo.java |  85 ++++++++
 .../sink/v2/bucket/BucketWrapperFlushEvent.java    |  64 ++++++
 .../paimon/sink/PaimonHashFunctionTest.java        | 137 +++++++++++++
 20 files changed, 1074 insertions(+), 31 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
index f565f7b3d..46f91ddff 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
@@ -41,4 +41,10 @@ public interface DataSink {
     default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider() {
         return new DefaultDataChangeEventHashFunctionProvider();
     }
+
+    default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider(
+            int parallelism) {
+        return getDataChangeEventHashFunctionProvider(); // fallback to 
nullary version if it isn't
+        // overridden
+    }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index ca4378ad1..bddd5fc00 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -146,7 +146,7 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                         parallelism,
                         parallelism,
                         schemaOperatorIDGenerator.generate(),
-                        dataSink.getDataChangeEventHashFunctionProvider());
+                        
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
 
         // Build Sink Operator
         sinkTranslator.translate(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
index 6c94a3a74..5a95f1efd 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
@@ -17,18 +17,21 @@
 
 package org.apache.flink.cdc.connectors.paimon.sink;
 
+import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
 import org.apache.flink.cdc.common.sink.DataSink;
 import org.apache.flink.cdc.common.sink.EventSinkProvider;
 import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
 import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
-import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonSink;
 
 import org.apache.paimon.options.Options;
 
 import java.io.Serializable;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 
@@ -47,26 +50,41 @@ public class PaimonDataSink implements DataSink, 
Serializable {
 
     private final PaimonRecordSerializer<Event> serializer;
 
+    private final ZoneId zoneId;
+
+    public final String schemaOperatorUid;
+
     public PaimonDataSink(
             Options options,
             Map<String, String> tableOptions,
             String commitUser,
             Map<TableId, List<String>> partitionMaps,
-            PaimonRecordSerializer<Event> serializer) {
+            PaimonRecordSerializer<Event> serializer,
+            ZoneId zoneId,
+            String schemaOperatorUid) {
         this.options = options;
         this.tableOptions = tableOptions;
         this.commitUser = commitUser;
         this.partitionMaps = partitionMaps;
         this.serializer = serializer;
+        this.zoneId = zoneId;
+        this.schemaOperatorUid = schemaOperatorUid;
     }
 
     @Override
     public EventSinkProvider getEventSinkProvider() {
-        return FlinkSinkProvider.of(new PaimonSink<>(options, commitUser, 
serializer));
+        return FlinkSinkProvider.of(
+                new PaimonEventSink(options, commitUser, serializer, 
schemaOperatorUid, zoneId));
     }
 
     @Override
     public MetadataApplier getMetadataApplier() {
         return new PaimonMetadataApplier(options, tableOptions, partitionMaps);
     }
+
+    @Override
+    public HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider(
+            int parallelism) {
+        return new PaimonHashFunctionProvider(options, zoneId, parallelism);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
index cf207325d..302ba629a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
@@ -104,7 +104,17 @@ public class PaimonDataSinkFactory implements 
DataSinkFactory {
             }
         }
         PaimonRecordSerializer<Event> serializer = new 
PaimonRecordEventSerializer(zoneId);
-        return new PaimonDataSink(options, tableOptions, commitUser, 
partitionMaps, serializer);
+        String schemaOperatorUid =
+                context.getPipelineConfiguration()
+                        .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
+        return new PaimonDataSink(
+                options,
+                tableOptions,
+                commitUser,
+                partitionMaps,
+                serializer,
+                zoneId,
+                schemaOperatorUid);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
new file mode 100644
index 000000000..b01ad39f2
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.List;
+
+/**
+ * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle 
{@link DataChangeEvent}
+ * by hash of PrimaryKey.
+ */
+public class PaimonHashFunction implements HashFunction<DataChangeEvent>, 
Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<RecordData.FieldGetter> fieldGetters;
+
+    private final RowAssignerChannelComputer channelComputer;
+
+    public PaimonHashFunction(
+            Options options, TableId tableId, Schema schema, ZoneId zoneId, 
int parallelism) {
+        Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
+        FileStoreTable table;
+        try {
+            table = (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId.toString()));
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException(e);
+        }
+        this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
+        channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
+        channelComputer.setup(parallelism);
+    }
+
+    @Override
+    public int hashcode(DataChangeEvent event) {
+        GenericRow genericRow = 
PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters);
+        return channelComputer.channel(genericRow);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
new file mode 100644
index 000000000..5f641f409
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+
+import org.apache.paimon.options.Options;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+
+/** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}. 
*/
+public class PaimonHashFunctionProvider implements 
HashFunctionProvider<DataChangeEvent> {
+
+    private final Options options;
+
+    private final ZoneId zoneId;
+
+    private final int parallelism;
+
+    public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int 
parallelism) {
+        this.options = options;
+        this.zoneId = zoneId;
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId 
tableId, Schema schema) {
+        return new PaimonHashFunction(options, tableId, schema, zoneId, 
parallelism);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
new file mode 100644
index 000000000..125fb17f1
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import org.apache.flink.shaded.guava31.com.google.common.hash.Hashing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Generating {@link OperatorID} for communication between Flink operators. */
+@Internal
+public class OperatorIDGenerator {
+    private final String transformationUid;
+
+    public OperatorIDGenerator(String transformationUid) {
+        this.transformationUid = transformationUid;
+    }
+
+    /**
+     * Generate {@link OperatorID}.
+     *
+     * <p>Operator ID generation is an internal implementation inside Flink, 
happening during the
+     * stream graph generating phase, so our algorithm of generating operator 
ID should be exactly
+     * the same as in Flink, in order to make sure that operators can reach 
out each other on the
+     * cluster.
+     *
+     * @see
+     *     
org.apache.flink.streaming.api.graph.StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes
+     *     the algorithm of generating operator ID in Flink
+     */
+    public OperatorID generate() {
+        byte[] hash =
+                Hashing.murmur3_128(0)
+                        .newHasher()
+                        .putString(transformationUid, UTF_8)
+                        .hash()
+                        .asBytes();
+        return new OperatorID(hash);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
index eb136aae7..d23ca7e76 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
@@ -31,6 +31,7 @@ public class PaimonEvent {
 
     // if true, means that table schema has changed right before this 
genericRow.
     boolean shouldRefreshSchema;
+    int bucket;
 
     public PaimonEvent(Identifier tableId, GenericRow genericRow) {
         this.tableId = tableId;
@@ -44,6 +45,14 @@ public class PaimonEvent {
         this.shouldRefreshSchema = shouldRefreshSchema;
     }
 
+    public PaimonEvent(
+            Identifier tableId, GenericRow genericRow, boolean 
shouldRefreshSchema, int bucket) {
+        this.tableId = tableId;
+        this.genericRow = genericRow;
+        this.shouldRefreshSchema = shouldRefreshSchema;
+        this.bucket = bucket;
+    }
+
     public Identifier getTableId() {
         return tableId;
     }
@@ -67,4 +76,12 @@ public class PaimonEvent {
     public void setGenericRow(GenericRow genericRow) {
         this.genericRow = genericRow;
     }
+
+    public int getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(int bucket) {
+        this.bucket = bucket;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
new file mode 100644
index 000000000..d6837915f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.event.Event;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+
+import java.time.ZoneId;
+
+/** A {@link PaimonSink} to process {@link Event}. */
+public class PaimonEventSink extends PaimonSink<Event> implements 
WithPreWriteTopology<Event> {
+
+    public final String schemaOperatorUid;
+
+    public final ZoneId zoneId;
+
+    public PaimonEventSink(
+            Options catalogOptions,
+            String commitUser,
+            PaimonRecordSerializer<Event> serializer,
+            String schemaOperatorUid,
+            ZoneId zoneId) {
+        super(catalogOptions, commitUser, serializer);
+        this.schemaOperatorUid = schemaOperatorUid;
+        this.zoneId = zoneId;
+    }
+
+    @Override
+    public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) 
{
+        // Shuffle by key hash => Assign bucket => Shuffle by bucket.
+        return dataStream
+                .transform(
+                        "BucketAssign",
+                        new BucketWrapperEventTypeInfo(),
+                        new BucketAssignOperator(
+                                catalogOptions, schemaOperatorUid, zoneId, 
commitUser))
+                .name("Assign Bucket")
+                // All Events after BucketAssignOperator are decorated with 
BucketWrapper.
+                .partitionCustom(
+                        (bucket, numPartitions) -> bucket % numPartitions,
+                        (event) -> ((BucketWrapper) event).getBucket());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<MultiTableCommittable> 
getCommittableSerializer() {
+        CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
+        return new MultiTableCommittableSerializer(fileSerializer);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
index 53b63f3b5..c3ceb31ac 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
@@ -51,10 +52,12 @@ public class PaimonRecordEventSerializer implements 
PaimonRecordSerializer<Event
 
     @Override
     public PaimonEvent serialize(Event event) {
-        Identifier tableId =
-                Identifier.create(
-                        ((ChangeEvent) event).tableId().getSchemaName(),
-                        ((ChangeEvent) event).tableId().getTableName());
+        int bucket = 0;
+        if (event instanceof BucketWrapperChangeEvent) {
+            bucket = ((BucketWrapperChangeEvent) event).getBucket();
+            event = ((BucketWrapperChangeEvent) event).getInnerEvent();
+        }
+        Identifier tableId = Identifier.fromString(((ChangeEvent) 
event).tableId().toString());
         if (event instanceof SchemaChangeEvent) {
             if (event instanceof CreateTableEvent) {
                 CreateTableEvent createTableEvent = (CreateTableEvent) event;
@@ -78,7 +81,7 @@ public class PaimonRecordEventSerializer implements 
PaimonRecordSerializer<Event
                     PaimonWriterHelper.convertEventToGenericRow(
                             dataChangeEvent,
                             
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
-            return new PaimonEvent(tableId, genericRow);
+            return new PaimonEvent(tableId, genericRow, false, bucket);
         } else {
             throw new IllegalArgumentException(
                     "failed to convert Input into PaimonEvent, unsupported 
event: " + event);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
index 5e310b2c3..61824ec44 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
@@ -40,9 +40,9 @@ public class PaimonSink<InputT> implements 
WithPreCommitTopology<InputT, MultiTa
     // provided a default commit user.
     public static final String DEFAULT_COMMIT_USER = "admin";
 
-    private final Options catalogOptions;
+    protected final Options catalogOptions;
 
-    private final String commitUser;
+    protected final String commitUser;
 
     private final PaimonRecordSerializer<InputT> serializer;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index 1b51535b2..87229f36c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 
@@ -134,7 +133,7 @@ public class PaimonWriter<InputT>
                                 return storeSinkWrite;
                             });
             try {
-                write.write(paimonEvent.getGenericRow());
+                write.write(paimonEvent.getGenericRow(), 
paimonEvent.getBucket());
             } catch (Exception e) {
                 throw new IOException(e);
             }
@@ -142,22 +141,15 @@ public class PaimonWriter<InputT>
     }
 
     private FileStoreTable getTable(Identifier tableId) {
-        FileStoreTable table =
-                tables.computeIfAbsent(
-                        tableId,
-                        id -> {
-                            try {
-                                return (FileStoreTable) 
catalog.getTable(tableId);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-
-        if (table.bucketMode() != BucketMode.FIXED) {
-            throw new UnsupportedOperationException(
-                    "Unified Sink only supports FIXED bucket mode, but is " + 
table.bucketMode());
-        }
-        return table;
+        return tables.computeIfAbsent(
+                tableId,
+                id -> {
+                    try {
+                        return (FileStoreTable) catalog.getTable(tableId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     /**
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index 3d49086b2..941189a0a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -148,7 +148,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     @Override
     public SinkRecord write(InternalRow internalRow, int i) throws Exception {
-        return write.writeAndReturn(internalRow);
+        return write.writeAndReturn(internalRow, i);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
new file mode 100644
index 000000000..07509574c
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.index.BucketAssigner;
+import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Assign bucket for every given {@link DataChangeEvent}. */
+public class BucketAssignOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event> {
+
+    public final String commitUser;
+
+    private final Options catalogOptions;
+
+    private Catalog catalog;
+
+    Map<TableId, Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, 
RowPartitionKeyExtractor>>
+            bucketAssignerMap;
+
+    // maintain the latest schema of tableId.
+    private Map<TableId, TableSchemaInfo> schemaMaps;
+
+    private int totalTasksNumber;
+
+    private int currentTaskNumber;
+
+    public final String schemaOperatorUid;
+
+    private transient SchemaEvolutionClient schemaEvolutionClient;
+
+    private final ZoneId zoneId;
+
+    public BucketAssignOperator(
+            Options catalogOptions, String schemaOperatorUid, ZoneId zoneId, 
String commitUser) {
+        this.catalogOptions = catalogOptions;
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.schemaOperatorUid = schemaOperatorUid;
+        this.commitUser = commitUser;
+        this.zoneId = zoneId;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        this.bucketAssignerMap = new HashMap<>();
+        this.totalTasksNumber = 
getRuntimeContext().getNumberOfParallelSubtasks();
+        this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask();
+        this.schemaMaps = new HashMap<>();
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Event>> output) {
+        super.setup(containingTask, config, output);
+        TaskOperatorEventGateway toCoordinator =
+                
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
+        schemaEvolutionClient =
+                new SchemaEvolutionClient(
+                        toCoordinator, new 
OperatorIDGenerator(schemaOperatorUid).generate());
+    }
+
+    @Override
+    public void processElement(StreamRecord<Event> streamRecord) throws 
Exception {
+        Event event = streamRecord.getValue();
+        if (event instanceof FlushEvent) {
+            output.collect(
+                    new StreamRecord<>(
+                            new BucketWrapperFlushEvent(
+                                    currentTaskNumber, ((FlushEvent) 
event).getTableId())));
+            return;
+        }
+
+        if (event instanceof DataChangeEvent) {
+            DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+            if (schemaMaps.containsKey(dataChangeEvent.tableId())) {
+                Optional<Schema> schema =
+                        
schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId());
+                if (schema.isPresent()) {
+                    schemaMaps.put(
+                            dataChangeEvent.tableId(), new 
TableSchemaInfo(schema.get(), zoneId));
+                } else {
+                    throw new RuntimeException(
+                            "Could not find schema message from SchemaRegistry 
for "
+                                    + dataChangeEvent.tableId());
+                }
+            }
+            Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, 
RowPartitionKeyExtractor> tuple4 =
+                    bucketAssignerMap.computeIfAbsent(
+                            dataChangeEvent.tableId(), this::getTableInfo);
+            int bucket;
+            GenericRow genericRow =
+                    PaimonWriterHelper.convertEventToGenericRow(
+                            dataChangeEvent,
+                            
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
+            switch (tuple4.f0) {
+                case DYNAMIC:
+                    {
+                        bucket =
+                                tuple4.f2.assign(
+                                        tuple4.f3.partition(genericRow),
+                                        
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
+                        break;
+                    }
+                case FIXED:
+                    {
+                        tuple4.f1.setRecord(genericRow);
+                        bucket = tuple4.f1.bucket();
+                        break;
+                    }
+                case UNAWARE:
+                    {
+                        bucket = 0;
+                        break;
+                    }
+                case GLOBAL_DYNAMIC:
+                default:
+                    {
+                        throw new RuntimeException("Unsupported bucket mode: " 
+ tuple4.f0);
+                    }
+            }
+            output.collect(
+                    new StreamRecord<>(new BucketWrapperChangeEvent(bucket, 
(ChangeEvent) event)));
+        } else if (event instanceof CreateTableEvent) {
+            CreateTableEvent createTableEvent = (CreateTableEvent) event;
+            schemaMaps.put(
+                    createTableEvent.tableId(),
+                    new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
+            output.collect(
+                    new StreamRecord<>(
+                            new BucketWrapperChangeEvent(currentTaskNumber, 
(ChangeEvent) event)));
+        } else if (event instanceof SchemaChangeEvent) {
+            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+            Schema schema =
+                    SchemaUtils.applySchemaChangeEvent(
+                            
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
+                            schemaChangeEvent);
+            schemaMaps.put(schemaChangeEvent.tableId(), new 
TableSchemaInfo(schema, zoneId));
+            output.collect(
+                    new StreamRecord<>(
+                            new BucketWrapperChangeEvent(currentTaskNumber, 
(ChangeEvent) event)));
+        }
+    }
+
+    private Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, 
RowPartitionKeyExtractor>
+            getTableInfo(TableId tableId) {
+        Preconditions.checkNotNull(tableId, "Invalid tableId in given event.");
+        FileStoreTable table;
+        try {
+            table = (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId.toString()));
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException(e);
+        }
+        long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
+        Integer numAssigners = 
table.coreOptions().dynamicBucketInitialBuckets();
+        return new Tuple4<>(
+                table.bucketMode(),
+                table.createRowKeyExtractor(),
+                new HashBucketAssigner(
+                        table.snapshotManager(),
+                        commitUser,
+                        table.store().newIndexFileHandler(),
+                        totalTasksNumber,
+                        MathUtils.min(numAssigners, totalTasksNumber),
+                        currentTaskNumber,
+                        targetRowNum),
+                new RowPartitionKeyExtractor(table.schema()));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
new file mode 100644
index 000000000..ed56cee97
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+/** Wrapper class with bucket. */
+public interface BucketWrapper {
+
+    /** Bucket value that was passed in {@link 
StoreSinkWrite#write(InternalRow, int)}. */
+    int getBucket();
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
new file mode 100644
index 000000000..6c0b3c155
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A wrapper class for {@link ChangeEvent} to attach bucket id. */
+public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper, 
Serializable {
+    private static final long serialVersionUID = 1L;
+    private final int bucket;
+
+    private final ChangeEvent innerEvent;
+
+    public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) {
+        this.bucket = bucket;
+        this.innerEvent = innerEvent;
+    }
+
+    public int getBucket() {
+        return bucket;
+    }
+
+    public ChangeEvent getInnerEvent() {
+        return innerEvent;
+    }
+
+    @Override
+    public TableId tableId() {
+        return innerEvent.tableId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o;
+        return bucket == that.bucket && Objects.equals(innerEvent, 
that.innerEvent);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucket, innerEvent);
+    }
+
+    @Override
+    public String toString() {
+        return "BucketWrapperChangeEvent{"
+                + "bucket="
+                + bucket
+                + ", innerEvent="
+                + innerEvent
+                + '}';
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
new file mode 100644
index 000000000..f37d9952f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
+import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
+import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/** A {@link TypeSerializerSingleton} for {@link BucketWrapperChangeEvent}. */
+public class BucketWrapperEventSerializer extends 
TypeSerializerSingleton<Event> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final EnumSerializer<EventClass> enumSerializer =
+            new EnumSerializer<>(EventClass.class);
+
+    private final EventSerializer eventSerializer = EventSerializer.INSTANCE;
+
+    private final TableIdSerializer tableIdSerializer = 
TableIdSerializer.INSTANCE;
+
+    /** Sharable instance of the TableIdSerializer. */
+    public static final BucketWrapperEventSerializer INSTANCE = new 
BucketWrapperEventSerializer();
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public Event createInstance() {
+        return new Event() {};
+    }
+
+    @Override
+    public Event copy(Event event) {
+        return event;
+    }
+
+    @Override
+    public Event copy(Event from, Event reuse) {
+        return copy(from);
+    }
+
+    @Override
+    public int getLength() {
+        return 0;
+    }
+
+    @Override
+    public void serialize(Event event, DataOutputView dataOutputView) throws 
IOException {
+        if (event instanceof BucketWrapperChangeEvent) {
+            BucketWrapperChangeEvent bucketWrapperChangeEvent = 
(BucketWrapperChangeEvent) event;
+            enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT, 
dataOutputView);
+            dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket());
+            
eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(), 
dataOutputView);
+        } else if (event instanceof BucketWrapperFlushEvent) {
+            enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT, 
dataOutputView);
+            BucketWrapperFlushEvent bucketWrapperFlushEvent = 
(BucketWrapperFlushEvent) event;
+            dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
+            tableIdSerializer.serialize(bucketWrapperFlushEvent.getTableId(), 
dataOutputView);
+        }
+    }
+
+    @Override
+    public Event deserialize(DataInputView source) throws IOException {
+        EventClass eventClass = enumSerializer.deserialize(source);
+        if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
+            return new BucketWrapperFlushEvent(
+                    source.readInt(), tableIdSerializer.deserialize(source));
+        } else {
+            return new BucketWrapperChangeEvent(
+                    source.readInt(), (ChangeEvent) 
eventSerializer.deserialize(source));
+        }
+    }
+
+    @Override
+    public Event deserialize(Event reuse, DataInputView source) throws 
IOException {
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<Event> snapshotConfiguration() {
+        return new EventSerializerSnapshot();
+    }
+
+    /** Serializer configuration snapshot for compatibility and format 
evolution. */
+    @SuppressWarnings("WeakerAccess")
+    public static final class EventSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<Event> {
+
+        public EventSerializerSnapshot() {
+            super(() -> INSTANCE);
+        }
+    }
+
+    enum EventClass {
+        BUCKET_WRAPPER_CHANGE_EVENT,
+        BUCKET_WRAPPER_FLUSH_EVENT
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
new file mode 100644
index 000000000..be1c54bfa
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEvent;
+
+/** A {@link TypeInformation} for {@link PaimonEvent}. */
+public class BucketWrapperEventTypeInfo extends TypeInformation<Event> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 0;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<Event> getTypeClass() {
+        return Event.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<Event> createSerializer(ExecutionConfig config) {
+        return BucketWrapperEventSerializer.INSTANCE;
+    }
+
+    @Override
+    public String toString() {
+        return "BucketWrapperEvent";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof BucketWrapperEventTypeInfo;
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof BucketWrapperEventTypeInfo;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
new file mode 100644
index 000000000..046a22a31
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.util.Objects;
+
+/** A wrapper class for {@link FlushEvent} to attach bucket id. */
+public class BucketWrapperFlushEvent extends FlushEvent implements 
BucketWrapper {
+
+    private final int bucket;
+
+    public BucketWrapperFlushEvent(int bucket, TableId tableId) {
+        super(tableId);
+        this.bucket = bucket;
+    }
+
+    @Override
+    public int getBucket() {
+        return bucket;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o;
+        return bucket == that.bucket;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), bucket);
+    }
+
+    @Override
+    public String toString() {
+        return "BucketWrapperFlushEvent{tableId=" + getTableId() + ", bucket=" 
+ bucket + '}';
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
new file mode 100644
index 000000000..86ff2f7b2
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PaimonHashFunction}. */
+public class PaimonHashFunctionTest {
+
+    @TempDir public static Path temporaryFolder;
+
+    private Catalog catalog;
+
+    private Options catalogOptions;
+
+    private static final String TEST_DATABASE = "test";
+
+    @BeforeEach
+    public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+        catalogOptions = new Options();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.setString("warehouse", warehouse);
+        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        catalog.createDatabase(TEST_DATABASE, true);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        catalog.dropDatabase(TEST_DATABASE, true, true);
+        catalog.close();
+    }
+
+    @Test
+    public void testHashCodeForFixedBucketTable() {
+        TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "10");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING().notNull())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .physicalColumn("pt", DataTypes.STRING())
+                        .primaryKey("col1", "pt")
+                        .partitionKey("pt")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+        PaimonHashFunction hashFunction =
+                new PaimonHashFunction(catalogOptions, tableId, schema, 
ZoneId.systemDefault(), 4);
+        DataChangeEvent dataChangeEvent1 =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key1 = hashFunction.hashcode(dataChangeEvent1);
+
+        DataChangeEvent dataChangeEvent2 =
+                DataChangeEvent.updateEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }),
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key2 = hashFunction.hashcode(dataChangeEvent2);
+
+        DataChangeEvent dataChangeEvent3 =
+                DataChangeEvent.deleteEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key3 = hashFunction.hashcode(dataChangeEvent3);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1).isEqualTo(key3);
+    }
+}

Reply via email to