This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 64f996b50 [FLINK-35891][pipeline-connector][paimon] Support dynamic
bucket in Paimon sink
64f996b50 is described below
commit 64f996b504fe9b5b250e1806c9b98de8df8bd01d
Author: Kunni <[email protected]>
AuthorDate: Mon Aug 12 14:46:01 2024 +0800
[FLINK-35891][pipeline-connector][paimon] Support dynamic bucket in Paimon
sink
This closes #3499.
---
.../org/apache/flink/cdc/common/sink/DataSink.java | 6 +
.../cdc/composer/flink/FlinkPipelineComposer.java | 2 +-
.../cdc/connectors/paimon/sink/PaimonDataSink.java | 24 ++-
.../paimon/sink/PaimonDataSinkFactory.java | 12 +-
.../connectors/paimon/sink/PaimonHashFunction.java | 70 +++++++
.../paimon/sink/PaimonHashFunctionProvider.java | 51 +++++
.../paimon/sink/v2/OperatorIDGenerator.java | 57 ++++++
.../cdc/connectors/paimon/sink/v2/PaimonEvent.java | 17 ++
.../connectors/paimon/sink/v2/PaimonEventSink.java | 74 +++++++
.../sink/v2/PaimonRecordEventSerializer.java | 13 +-
.../cdc/connectors/paimon/sink/v2/PaimonSink.java | 4 +-
.../connectors/paimon/sink/v2/PaimonWriter.java | 28 +--
.../paimon/sink/v2/StoreSinkWriteImpl.java | 2 +-
.../sink/v2/bucket/BucketAssignOperator.java | 226 +++++++++++++++++++++
.../paimon/sink/v2/bucket/BucketWrapper.java | 28 +++
.../sink/v2/bucket/BucketWrapperChangeEvent.java | 77 +++++++
.../v2/bucket/BucketWrapperEventSerializer.java | 128 ++++++++++++
.../sink/v2/bucket/BucketWrapperEventTypeInfo.java | 85 ++++++++
.../sink/v2/bucket/BucketWrapperFlushEvent.java | 64 ++++++
.../paimon/sink/PaimonHashFunctionTest.java | 137 +++++++++++++
20 files changed, 1074 insertions(+), 31 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
index f565f7b3d..46f91ddff 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
@@ -41,4 +41,10 @@ public interface DataSink {
default HashFunctionProvider<DataChangeEvent>
getDataChangeEventHashFunctionProvider() {
return new DefaultDataChangeEventHashFunctionProvider();
}
+
+ default HashFunctionProvider<DataChangeEvent>
getDataChangeEventHashFunctionProvider(
+ int parallelism) {
+ return getDataChangeEventHashFunctionProvider(); // fallback to
nullary version if it isn't
+ // overridden
+ }
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index ca4378ad1..bddd5fc00 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -146,7 +146,7 @@ public class FlinkPipelineComposer implements
PipelineComposer {
parallelism,
parallelism,
schemaOperatorIDGenerator.generate(),
- dataSink.getDataChangeEventHashFunctionProvider());
+
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
// Build Sink Operator
sinkTranslator.translate(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
index 6c94a3a74..5a95f1efd 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
@@ -17,18 +17,21 @@
package org.apache.flink.cdc.connectors.paimon.sink;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
-import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonSink;
import org.apache.paimon.options.Options;
import java.io.Serializable;
+import java.time.ZoneId;
import java.util.List;
import java.util.Map;
@@ -47,26 +50,41 @@ public class PaimonDataSink implements DataSink,
Serializable {
private final PaimonRecordSerializer<Event> serializer;
+ private final ZoneId zoneId;
+
+ public final String schemaOperatorUid;
+
public PaimonDataSink(
Options options,
Map<String, String> tableOptions,
String commitUser,
Map<TableId, List<String>> partitionMaps,
- PaimonRecordSerializer<Event> serializer) {
+ PaimonRecordSerializer<Event> serializer,
+ ZoneId zoneId,
+ String schemaOperatorUid) {
this.options = options;
this.tableOptions = tableOptions;
this.commitUser = commitUser;
this.partitionMaps = partitionMaps;
this.serializer = serializer;
+ this.zoneId = zoneId;
+ this.schemaOperatorUid = schemaOperatorUid;
}
@Override
public EventSinkProvider getEventSinkProvider() {
- return FlinkSinkProvider.of(new PaimonSink<>(options, commitUser,
serializer));
+ return FlinkSinkProvider.of(
+ new PaimonEventSink(options, commitUser, serializer,
schemaOperatorUid, zoneId));
}
@Override
public MetadataApplier getMetadataApplier() {
return new PaimonMetadataApplier(options, tableOptions, partitionMaps);
}
+
+ @Override
+ public HashFunctionProvider<DataChangeEvent>
getDataChangeEventHashFunctionProvider(
+ int parallelism) {
+ return new PaimonHashFunctionProvider(options, zoneId, parallelism);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
index cf207325d..302ba629a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
@@ -104,7 +104,17 @@ public class PaimonDataSinkFactory implements
DataSinkFactory {
}
}
PaimonRecordSerializer<Event> serializer = new
PaimonRecordEventSerializer(zoneId);
- return new PaimonDataSink(options, tableOptions, commitUser,
partitionMaps, serializer);
+ String schemaOperatorUid =
+ context.getPipelineConfiguration()
+ .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
+ return new PaimonDataSink(
+ options,
+ tableOptions,
+ commitUser,
+ partitionMaps,
+ serializer,
+ zoneId,
+ schemaOperatorUid);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
new file mode 100644
index 000000000..b01ad39f2
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.List;
+
+/**
+ * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle
{@link DataChangeEvent}
+ * by hash of PrimaryKey.
+ */
+public class PaimonHashFunction implements HashFunction<DataChangeEvent>,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<RecordData.FieldGetter> fieldGetters;
+
+ private final RowAssignerChannelComputer channelComputer;
+
+ public PaimonHashFunction(
+ Options options, TableId tableId, Schema schema, ZoneId zoneId,
int parallelism) {
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
+ FileStoreTable table;
+ try {
+ table = (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId.toString()));
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema,
zoneId);
+ channelComputer = new RowAssignerChannelComputer(table.schema(),
parallelism);
+ channelComputer.setup(parallelism);
+ }
+
+ @Override
+ public int hashcode(DataChangeEvent event) {
+ GenericRow genericRow =
PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters);
+ return channelComputer.channel(genericRow);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
new file mode 100644
index 000000000..5f641f409
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+
+import org.apache.paimon.options.Options;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+
+/** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}.
*/
+public class PaimonHashFunctionProvider implements
HashFunctionProvider<DataChangeEvent> {
+
+ private final Options options;
+
+ private final ZoneId zoneId;
+
+ private final int parallelism;
+
+ public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int
parallelism) {
+ this.options = options;
+ this.zoneId = zoneId;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId
tableId, Schema schema) {
+ return new PaimonHashFunction(options, tableId, schema, zoneId,
parallelism);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
new file mode 100644
index 000000000..125fb17f1
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import org.apache.flink.shaded.guava31.com.google.common.hash.Hashing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Generating {@link OperatorID} for communication between Flink operators. */
+@Internal
+public class OperatorIDGenerator {
+ private final String transformationUid;
+
+ public OperatorIDGenerator(String transformationUid) {
+ this.transformationUid = transformationUid;
+ }
+
+ /**
+ * Generate {@link OperatorID}.
+ *
+ * <p>Operator ID generation is an internal implementation inside Flink,
happening during the
+ * stream graph generating phase, so our algorithm of generating operator
ID should be exactly
+ * the same as in Flink, in order to make sure that operators can reach
out each other on the
+ * cluster.
+ *
+ * @see
+ *
org.apache.flink.streaming.api.graph.StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes
+ * the algorithm of generating operator ID in Flink
+ */
+ public OperatorID generate() {
+ byte[] hash =
+ Hashing.murmur3_128(0)
+ .newHasher()
+ .putString(transformationUid, UTF_8)
+ .hash()
+ .asBytes();
+ return new OperatorID(hash);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
index eb136aae7..d23ca7e76 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
@@ -31,6 +31,7 @@ public class PaimonEvent {
// if true, means that table schema has changed right before this
genericRow.
boolean shouldRefreshSchema;
+ int bucket;
public PaimonEvent(Identifier tableId, GenericRow genericRow) {
this.tableId = tableId;
@@ -44,6 +45,14 @@ public class PaimonEvent {
this.shouldRefreshSchema = shouldRefreshSchema;
}
+ public PaimonEvent(
+ Identifier tableId, GenericRow genericRow, boolean
shouldRefreshSchema, int bucket) {
+ this.tableId = tableId;
+ this.genericRow = genericRow;
+ this.shouldRefreshSchema = shouldRefreshSchema;
+ this.bucket = bucket;
+ }
+
public Identifier getTableId() {
return tableId;
}
@@ -67,4 +76,12 @@ public class PaimonEvent {
public void setGenericRow(GenericRow genericRow) {
this.genericRow = genericRow;
}
+
+ public int getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(int bucket) {
+ this.bucket = bucket;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
new file mode 100644
index 000000000..d6837915f
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.event.Event;
+import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
+import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+
+import java.time.ZoneId;
+
+/** A {@link PaimonSink} to process {@link Event}. */
+public class PaimonEventSink extends PaimonSink<Event> implements
WithPreWriteTopology<Event> {
+
+ public final String schemaOperatorUid;
+
+ public final ZoneId zoneId;
+
+ public PaimonEventSink(
+ Options catalogOptions,
+ String commitUser,
+ PaimonRecordSerializer<Event> serializer,
+ String schemaOperatorUid,
+ ZoneId zoneId) {
+ super(catalogOptions, commitUser, serializer);
+ this.schemaOperatorUid = schemaOperatorUid;
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream)
{
+ // Shuffle by key hash => Assign bucket => Shuffle by bucket.
+ return dataStream
+ .transform(
+ "BucketAssign",
+ new BucketWrapperEventTypeInfo(),
+ new BucketAssignOperator(
+ catalogOptions, schemaOperatorUid, zoneId,
commitUser))
+ .name("Assign Bucket")
+ // All Events after BucketAssignOperator are decorated with
BucketWrapper.
+ .partitionCustom(
+ (bucket, numPartitions) -> bucket % numPartitions,
+ (event) -> ((BucketWrapper) event).getBucket());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<MultiTableCommittable>
getCommittableSerializer() {
+ CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
+ return new MultiTableCommittableSerializer(fileSerializer);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
index 53b63f3b5..c3ceb31ac 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.SchemaUtils;
+import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
@@ -51,10 +52,12 @@ public class PaimonRecordEventSerializer implements
PaimonRecordSerializer<Event
@Override
public PaimonEvent serialize(Event event) {
- Identifier tableId =
- Identifier.create(
- ((ChangeEvent) event).tableId().getSchemaName(),
- ((ChangeEvent) event).tableId().getTableName());
+ int bucket = 0;
+ if (event instanceof BucketWrapperChangeEvent) {
+ bucket = ((BucketWrapperChangeEvent) event).getBucket();
+ event = ((BucketWrapperChangeEvent) event).getInnerEvent();
+ }
+ Identifier tableId = Identifier.fromString(((ChangeEvent)
event).tableId().toString());
if (event instanceof SchemaChangeEvent) {
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
@@ -78,7 +81,7 @@ public class PaimonRecordEventSerializer implements
PaimonRecordSerializer<Event
PaimonWriterHelper.convertEventToGenericRow(
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
- return new PaimonEvent(tableId, genericRow);
+ return new PaimonEvent(tableId, genericRow, false, bucket);
} else {
throw new IllegalArgumentException(
"failed to convert Input into PaimonEvent, unsupported
event: " + event);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
index 5e310b2c3..61824ec44 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
@@ -40,9 +40,9 @@ public class PaimonSink<InputT> implements
WithPreCommitTopology<InputT, MultiTa
// provided a default commit user.
public static final String DEFAULT_COMMIT_USER = "admin";
- private final Options catalogOptions;
+ protected final Options catalogOptions;
- private final String commitUser;
+ protected final String commitUser;
private final PaimonRecordSerializer<InputT> serializer;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index 1b51535b2..87229f36c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;
@@ -134,7 +133,7 @@ public class PaimonWriter<InputT>
return storeSinkWrite;
});
try {
- write.write(paimonEvent.getGenericRow());
+ write.write(paimonEvent.getGenericRow(),
paimonEvent.getBucket());
} catch (Exception e) {
throw new IOException(e);
}
@@ -142,22 +141,15 @@ public class PaimonWriter<InputT>
}
private FileStoreTable getTable(Identifier tableId) {
- FileStoreTable table =
- tables.computeIfAbsent(
- tableId,
- id -> {
- try {
- return (FileStoreTable)
catalog.getTable(tableId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- if (table.bucketMode() != BucketMode.FIXED) {
- throw new UnsupportedOperationException(
- "Unified Sink only supports FIXED bucket mode, but is " +
table.bucketMode());
- }
- return table;
+ return tables.computeIfAbsent(
+ tableId,
+ id -> {
+ try {
+ return (FileStoreTable) catalog.getTable(tableId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
/**
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index 3d49086b2..941189a0a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -148,7 +148,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
@Override
public SinkRecord write(InternalRow internalRow, int i) throws Exception {
- return write.writeAndReturn(internalRow);
+ return write.writeAndReturn(internalRow, i);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
new file mode 100644
index 000000000..07509574c
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.index.BucketAssigner;
+import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Assign bucket for every given {@link DataChangeEvent}. */
+public class BucketAssignOperator extends AbstractStreamOperator<Event>
+ implements OneInputStreamOperator<Event, Event> {
+
+ public final String commitUser;
+
+ private final Options catalogOptions;
+
+ private Catalog catalog;
+
+ Map<TableId, Tuple4<BucketMode, RowKeyExtractor, BucketAssigner,
RowPartitionKeyExtractor>>
+ bucketAssignerMap;
+
+ // maintain the latest schema of tableId.
+ private Map<TableId, TableSchemaInfo> schemaMaps;
+
+ private int totalTasksNumber;
+
+ private int currentTaskNumber;
+
+ public final String schemaOperatorUid;
+
+ private transient SchemaEvolutionClient schemaEvolutionClient;
+
+ private final ZoneId zoneId;
+
+ public BucketAssignOperator(
+ Options catalogOptions, String schemaOperatorUid, ZoneId zoneId,
String commitUser) {
+ this.catalogOptions = catalogOptions;
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.schemaOperatorUid = schemaOperatorUid;
+ this.commitUser = commitUser;
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ this.bucketAssignerMap = new HashMap<>();
+ this.totalTasksNumber =
getRuntimeContext().getNumberOfParallelSubtasks();
+ this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask();
+ this.schemaMaps = new HashMap<>();
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Event>> output) {
+ super.setup(containingTask, config, output);
+ TaskOperatorEventGateway toCoordinator =
+
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
+ schemaEvolutionClient =
+ new SchemaEvolutionClient(
+ toCoordinator, new
OperatorIDGenerator(schemaOperatorUid).generate());
+ }
+
+ @Override
+ public void processElement(StreamRecord<Event> streamRecord) throws
Exception {
+ Event event = streamRecord.getValue();
+ if (event instanceof FlushEvent) {
+ output.collect(
+ new StreamRecord<>(
+ new BucketWrapperFlushEvent(
+ currentTaskNumber, ((FlushEvent)
event).getTableId())));
+ return;
+ }
+
+ if (event instanceof DataChangeEvent) {
+ DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+ if (schemaMaps.containsKey(dataChangeEvent.tableId())) {
+ Optional<Schema> schema =
+
schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId());
+ if (schema.isPresent()) {
+ schemaMaps.put(
+ dataChangeEvent.tableId(), new
TableSchemaInfo(schema.get(), zoneId));
+ } else {
+ throw new RuntimeException(
+ "Could not find schema message from SchemaRegistry
for "
+ + dataChangeEvent.tableId());
+ }
+ }
+ Tuple4<BucketMode, RowKeyExtractor, BucketAssigner,
RowPartitionKeyExtractor> tuple4 =
+ bucketAssignerMap.computeIfAbsent(
+ dataChangeEvent.tableId(), this::getTableInfo);
+ int bucket;
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(
+ dataChangeEvent,
+
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
+ switch (tuple4.f0) {
+ case DYNAMIC:
+ {
+ bucket =
+ tuple4.f2.assign(
+ tuple4.f3.partition(genericRow),
+
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
+ break;
+ }
+ case FIXED:
+ {
+ tuple4.f1.setRecord(genericRow);
+ bucket = tuple4.f1.bucket();
+ break;
+ }
+ case UNAWARE:
+ {
+ bucket = 0;
+ break;
+ }
+ case GLOBAL_DYNAMIC:
+ default:
+ {
+ throw new RuntimeException("Unsupported bucket mode: "
+ tuple4.f0);
+ }
+ }
+ output.collect(
+ new StreamRecord<>(new BucketWrapperChangeEvent(bucket,
(ChangeEvent) event)));
+ } else if (event instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) event;
+ schemaMaps.put(
+ createTableEvent.tableId(),
+ new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
+ output.collect(
+ new StreamRecord<>(
+ new BucketWrapperChangeEvent(currentTaskNumber,
(ChangeEvent) event)));
+ } else if (event instanceof SchemaChangeEvent) {
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+ Schema schema =
+ SchemaUtils.applySchemaChangeEvent(
+
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
+ schemaChangeEvent);
+ schemaMaps.put(schemaChangeEvent.tableId(), new
TableSchemaInfo(schema, zoneId));
+ output.collect(
+ new StreamRecord<>(
+ new BucketWrapperChangeEvent(currentTaskNumber,
(ChangeEvent) event)));
+ }
+ }
+
+ private Tuple4<BucketMode, RowKeyExtractor, BucketAssigner,
RowPartitionKeyExtractor>
+ getTableInfo(TableId tableId) {
+ Preconditions.checkNotNull(tableId, "Invalid tableId in given event.");
+ FileStoreTable table;
+ try {
+ table = (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId.toString()));
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
+ Integer numAssigners =
table.coreOptions().dynamicBucketInitialBuckets();
+ return new Tuple4<>(
+ table.bucketMode(),
+ table.createRowKeyExtractor(),
+ new HashBucketAssigner(
+ table.snapshotManager(),
+ commitUser,
+ table.store().newIndexFileHandler(),
+ totalTasksNumber,
+ MathUtils.min(numAssigners, totalTasksNumber),
+ currentTaskNumber,
+ targetRowNum),
+ new RowPartitionKeyExtractor(table.schema()));
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
new file mode 100644
index 000000000..ed56cee97
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+/** Wrapper class with bucket. */
+public interface BucketWrapper {
+
+ /** Bucket value that was passed in {@link
StoreSinkWrite#write(InternalRow, int)}. */
+ int getBucket();
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
new file mode 100644
index 000000000..6c0b3c155
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A wrapper class for {@link ChangeEvent} to attach bucket id. */
+public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper,
Serializable {
+ private static final long serialVersionUID = 1L;
+ private final int bucket;
+
+ private final ChangeEvent innerEvent;
+
+ public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) {
+ this.bucket = bucket;
+ this.innerEvent = innerEvent;
+ }
+
+ public int getBucket() {
+ return bucket;
+ }
+
+ public ChangeEvent getInnerEvent() {
+ return innerEvent;
+ }
+
+ @Override
+ public TableId tableId() {
+ return innerEvent.tableId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o;
+ return bucket == that.bucket && Objects.equals(innerEvent,
that.innerEvent);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucket, innerEvent);
+ }
+
+ @Override
+ public String toString() {
+ return "BucketWrapperChangeEvent{"
+ + "bucket="
+ + bucket
+ + ", innerEvent="
+ + innerEvent
+ + '}';
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
new file mode 100644
index 000000000..f37d9952f
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
+import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
+import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/** A {@link TypeSerializerSingleton} for {@link BucketWrapperChangeEvent}. */
+public class BucketWrapperEventSerializer extends
TypeSerializerSingleton<Event> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final EnumSerializer<EventClass> enumSerializer =
+ new EnumSerializer<>(EventClass.class);
+
+ private final EventSerializer eventSerializer = EventSerializer.INSTANCE;
+
+ private final TableIdSerializer tableIdSerializer =
TableIdSerializer.INSTANCE;
+
+ /** Sharable instance of the TableIdSerializer. */
+ public static final BucketWrapperEventSerializer INSTANCE = new
BucketWrapperEventSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public Event createInstance() {
+ return new Event() {};
+ }
+
+ @Override
+ public Event copy(Event event) {
+ return event;
+ }
+
+ @Override
+ public Event copy(Event from, Event reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(Event event, DataOutputView dataOutputView) throws
IOException {
+ if (event instanceof BucketWrapperChangeEvent) {
+ BucketWrapperChangeEvent bucketWrapperChangeEvent =
(BucketWrapperChangeEvent) event;
+ enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT,
dataOutputView);
+ dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket());
+
eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(),
dataOutputView);
+ } else if (event instanceof BucketWrapperFlushEvent) {
+ enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT,
dataOutputView);
+ BucketWrapperFlushEvent bucketWrapperFlushEvent =
(BucketWrapperFlushEvent) event;
+ dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
+ tableIdSerializer.serialize(bucketWrapperFlushEvent.getTableId(),
dataOutputView);
+ }
+ }
+
+ @Override
+ public Event deserialize(DataInputView source) throws IOException {
+ EventClass eventClass = enumSerializer.deserialize(source);
+ if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
+ return new BucketWrapperFlushEvent(
+ source.readInt(), tableIdSerializer.deserialize(source));
+ } else {
+ return new BucketWrapperChangeEvent(
+ source.readInt(), (ChangeEvent)
eventSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public Event deserialize(Event reuse, DataInputView source) throws
IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ serialize(deserialize(source), target);
+ }
+
+ @Override
+ public TypeSerializerSnapshot<Event> snapshotConfiguration() {
+ return new EventSerializerSnapshot();
+ }
+
+ /** Serializer configuration snapshot for compatibility and format
evolution. */
+ @SuppressWarnings("WeakerAccess")
+ public static final class EventSerializerSnapshot extends
SimpleTypeSerializerSnapshot<Event> {
+
+ public EventSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+
+ enum EventClass {
+ BUCKET_WRAPPER_CHANGE_EVENT,
+ BUCKET_WRAPPER_FLUSH_EVENT
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
new file mode 100644
index 000000000..be1c54bfa
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEvent;
+
+/** A {@link TypeInformation} for {@link PaimonEvent}. */
+public class BucketWrapperEventTypeInfo extends TypeInformation<Event> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<Event> getTypeClass() {
+ return Event.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Event> createSerializer(ExecutionConfig config) {
+ return BucketWrapperEventSerializer.INSTANCE;
+ }
+
+ @Override
+ public String toString() {
+ return "BucketWrapperEvent";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof BucketWrapperEventTypeInfo;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof BucketWrapperEventTypeInfo;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
new file mode 100644
index 000000000..046a22a31
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.util.Objects;
+
+/** A wrapper class for {@link FlushEvent} to attach bucket id. */
+public class BucketWrapperFlushEvent extends FlushEvent implements
BucketWrapper {
+
+ private final int bucket;
+
+ public BucketWrapperFlushEvent(int bucket, TableId tableId) {
+ super(tableId);
+ this.bucket = bucket;
+ }
+
+ @Override
+ public int getBucket() {
+ return bucket;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o;
+ return bucket == that.bucket;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), bucket);
+ }
+
+ @Override
+ public String toString() {
+ return "BucketWrapperFlushEvent{tableId=" + getTableId() + ", bucket="
+ bucket + '}';
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
new file mode 100644
index 000000000..86ff2f7b2
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PaimonHashFunction}. */
+public class PaimonHashFunctionTest {
+
+ @TempDir public static Path temporaryFolder;
+
+ private Catalog catalog;
+
+ private Options catalogOptions;
+
+ private static final String TEST_DATABASE = "test";
+
+ @BeforeEach
+ public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+ catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ catalog.createDatabase(TEST_DATABASE, true);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ catalog.dropDatabase(TEST_DATABASE, true, true);
+ catalog.close();
+ }
+
+ @Test
+ public void testHashCodeForFixedBucketTable() {
+ TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("bucket", "10");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("col2", DataTypes.STRING())
+ .physicalColumn("pt", DataTypes.STRING())
+ .primaryKey("col1", "pt")
+ .partitionKey("pt")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+ PaimonHashFunction hashFunction =
+ new PaimonHashFunction(catalogOptions, tableId, schema,
ZoneId.systemDefault(), 4);
+ DataChangeEvent dataChangeEvent1 =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key1 = hashFunction.hashcode(dataChangeEvent1);
+
+ DataChangeEvent dataChangeEvent2 =
+ DataChangeEvent.updateEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2024")
+ }),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key2 = hashFunction.hashcode(dataChangeEvent2);
+
+ DataChangeEvent dataChangeEvent3 =
+ DataChangeEvent.deleteEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key3 = hashFunction.hashcode(dataChangeEvent3);
+
+ assertThat(key1).isEqualTo(key2);
+ assertThat(key1).isEqualTo(key3);
+ }
+}