lvyanquan commented on code in PR #4423:
URL: https://github.com/apache/flink-cdc/pull/4423#discussion_r3346221312
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -136,5 +136,24 @@ public class PipelineOptions {
.withDescription(
"The timeout time for SchemaOperator to wait
downstream SchemaChangeEvent applying finished, the default value is 3
minutes.");
+ public static final ConfigOption<HashFunctionStrategy>
PIPELINE_PARTITIONING_STRATEGY =
+ ConfigOptions.key("partitioning.strategy")
+ .enumType(HashFunctionStrategy.class)
+ .noDefaultValue()
Review Comment:
The current option uses `null` (via `.noDefaultValue()`) to mean "use the
sink's own provider", which is an implicit convention. I'd suggest adding a
third enum value `SINK_DEFINED` in `HashFunctionStrategy` and making it the
default.
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/TableIdHashFunctionProvider.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.sink;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A {@link HashFunctionProvider} implementation that hashes events based
solely on {@link TableId}.
+ *
+ * <p>This provider ensures all events from the same table are routed to the
same downstream
+ * subtask, regardless of their primary key values or record payload. This is
useful when per-table
+ * ordering semantics are required.
+ */
+public class TableIdHashFunctionProvider implements
HashFunctionProvider<DataChangeEvent> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId
tableId, Schema schema) {
+ return TableIdHashFunction.INSTANCE;
+ }
+
+ /** A {@link HashFunction} that computes hash based solely on TableId. */
+ static class TableIdHashFunction implements HashFunction<DataChangeEvent> {
+
+ static final TableIdHashFunction INSTANCE = new TableIdHashFunction();
+
+ private TableIdHashFunction() {}
+
+ @Override
+ public int hashcode(DataChangeEvent event) {
+ List<Object> objectsToHash = new ArrayList<>();
+ TableId tableId = event.tableId();
+
Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
+
Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
+ objectsToHash.add(tableId.getTableName());
+
+ // Calculate hash
+ return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
Review Comment:
TableIdHashFunctionProvider.java:55-65 allocates an ArrayList and calls
`Objects.hash(...)` on **every event**, but the result depends only on
`event.tableId()` — which is already passed to `getHashFunction(tableId,
schema)`.
It would be better to drop the `INSTANCE` singleton and have the provider
new a `TableIdHashFunction(tableId)` per call, computing the hash once in the
constructor and just returning the cached int from `hashcode(event)`.
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -136,5 +136,24 @@ public class PipelineOptions {
.withDescription(
"The timeout time for SchemaOperator to wait
downstream SchemaChangeEvent applying finished, the default value is 3
minutes.");
+ public static final ConfigOption<HashFunctionStrategy>
PIPELINE_PARTITIONING_STRATEGY =
+ ConfigOptions.key("partitioning.strategy")
Review Comment:
Please update the
[document](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/core-concept/data-pipeline/#pipeline-configurations)
to guide users on how to use it.
##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/sink/TableIdHashFunctionProviderTest.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.sink;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link TableIdHashFunctionProvider}. */
+class TableIdHashFunctionProviderTest {
+
+ private static final TableId TABLE_A = TableId.tableId("namespace",
"schema", "table_a");
+ private static final TableId TABLE_B = TableId.tableId("namespace",
"schema", "table_b");
+ private static final TableId TABLE_C = TableId.tableId("other_namespace",
"schema", "table_a");
+
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ private final TableIdHashFunctionProvider provider = new
TableIdHashFunctionProvider();
+
+ @Test
+ void testSameTableSameHash() {
+ HashFunction<DataChangeEvent> hashFunction =
provider.getHashFunction(TABLE_A, SCHEMA);
+
+ DataChangeEvent event1 = createInsertEvent(TABLE_A, 1, "Alice");
+ DataChangeEvent event2 = createInsertEvent(TABLE_A, 2, "Bob");
+ DataChangeEvent event3 = createInsertEvent(TABLE_A, 3, "Charlie");
+
+ int hash1 = hashFunction.hashcode(event1);
+ int hash2 = hashFunction.hashcode(event2);
+ int hash3 = hashFunction.hashcode(event3);
+
+ // All events from the same table should have the same hash
+ assertThat(hash1).isEqualTo(hash2);
+ assertThat(hash2).isEqualTo(hash3);
+ }
+
+ @Test
+ void testDifferentTableDifferentHash() {
+ HashFunction<DataChangeEvent> hashFunctionA =
provider.getHashFunction(TABLE_A, SCHEMA);
+ HashFunction<DataChangeEvent> hashFunctionB =
provider.getHashFunction(TABLE_B, SCHEMA);
+
+ DataChangeEvent eventA = createInsertEvent(TABLE_A, 1, "Alice");
+ DataChangeEvent eventB = createInsertEvent(TABLE_B, 1, "Alice");
+
+ int hashA = hashFunctionA.hashcode(eventA);
+ int hashB = hashFunctionB.hashcode(eventB);
+
+ // Events from different tables should have different hashes
+ assertThat(hashA).isNotEqualTo(hashB);
+ }
+
+ @Test
+ void testDifferentNamespaceDifferentHash() {
+ HashFunction<DataChangeEvent> hashFunctionA =
provider.getHashFunction(TABLE_A, SCHEMA);
+ HashFunction<DataChangeEvent> hashFunctionC =
provider.getHashFunction(TABLE_C, SCHEMA);
+
+ DataChangeEvent eventA = createInsertEvent(TABLE_A, 1, "Alice");
+ DataChangeEvent eventC = createInsertEvent(TABLE_C, 1, "Alice");
+
+ int hashA = hashFunctionA.hashcode(eventA);
+ int hashC = hashFunctionC.hashcode(eventC);
+
+ // Events from different namespaces should have different hashes
+ assertThat(hashA).isNotEqualTo(hashC);
+ }
+
+ @Test
+ void testHashIsPositive() {
+ HashFunction<DataChangeEvent> hashFunction =
provider.getHashFunction(TABLE_A, SCHEMA);
+
+ DataChangeEvent event = createInsertEvent(TABLE_A, 1, "Alice");
+ int hash = hashFunction.hashcode(event);
+
+ // Hash should always be non-negative
+ assertThat(hash).isGreaterThanOrEqualTo(0);
+ }
+
+ @Test
+ void testHashIgnoresPrimaryKeyValue() {
+ HashFunction<DataChangeEvent> hashFunction =
provider.getHashFunction(TABLE_A, SCHEMA);
+
+ // Create events with different primary key values
+ DataChangeEvent eventPk1 = createInsertEvent(TABLE_A, 1, "Alice");
+ DataChangeEvent eventPk2 = createInsertEvent(TABLE_A, 2, "Bob");
+ DataChangeEvent eventPk3 = createInsertEvent(TABLE_A, 100, "Charlie");
+
+ int hash1 = hashFunction.hashcode(eventPk1);
+ int hash2 = hashFunction.hashcode(eventPk2);
+ int hash3 = hashFunction.hashcode(eventPk3);
+
+ // Hash should be the same regardless of primary key values
+ assertThat(hash1).isEqualTo(hash2);
+ assertThat(hash2).isEqualTo(hash3);
+ }
+
+ @Test
+ void testHashIgnoresOperationType() {
+ HashFunction<DataChangeEvent> hashFunction =
provider.getHashFunction(TABLE_A, SCHEMA);
+
+ DataChangeEvent insertEvent = createInsertEvent(TABLE_A, 1, "Alice");
+ DataChangeEvent updateEvent = createUpdateEvent(TABLE_A, 1, "Alice",
"Alice_Updated");
+ DataChangeEvent deleteEvent = createDeleteEvent(TABLE_A, 1, "Alice");
+
+ int insertHash = hashFunction.hashcode(insertEvent);
+ int updateHash = hashFunction.hashcode(updateEvent);
+ int deleteHash = hashFunction.hashcode(deleteEvent);
+
+ // Hash should be the same regardless of operation type
+ assertThat(insertHash).isEqualTo(updateHash);
+ assertThat(updateHash).isEqualTo(deleteHash);
+ }
+
+ @Test
+ void testSingletonInstance() {
+ HashFunction<DataChangeEvent> hashFunction1 =
provider.getHashFunction(TABLE_A, SCHEMA);
+ HashFunction<DataChangeEvent> hashFunction2 =
provider.getHashFunction(TABLE_B, SCHEMA);
+
+ // Both should return the same singleton instance
+ assertThat(hashFunction1).isSameAs(hashFunction2);
+ }
+
+ private DataChangeEvent createInsertEvent(TableId tableId, int id, String
name) {
+ return DataChangeEvent.insertEvent(
+ tableId,
+ org.apache.flink.cdc.common.data.GenericRecordData.of(
Review Comment:
Add an import instead of fully-qualified
name for consistency with the rest of the codebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]