[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhangdingxin updated FLINK-35237: --------------------------------- Description: The {{PrePartitionOperator}} in its current implementation only supports a fixed {{HashFunction}} ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). This limits the ability of Sink implementations to customize the partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned tables, it would be advantageous to allow hashing based on partition keys, hashing according to table names, or using the database engine's internal primary key hash functions (such as with MaxCompute DataSink). When users require such custom partitioning logic, they are compelled to implement their PartitionOperator, which undermines the utility of {{{}PrePartitionOperator{}}}. To address this limitation, it would be highly desirable to enable the {{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible solution could involve a mechanism analogous to the {{DataSink}} interface, allowing the specification of a {{HashFunctionProvider}} class path in the configuration file. This enhancement would greatly facilitate users in tailoring partition strategies to meet their specific application needs. In this case, I want to create new class {{HashFunctionProvider}} and {{{}HashFunction{}}}: {code:java} public interface HashFunctionProvider { HashFunction getHashFunction(Schema schema); } public interface HashFunction extends Function<DataChangeEvent, Integer> { Integer apply(DataChangeEvent event); } {code} add {{getHashFunctionProvider}} method to {{DataSink}} {code:java} public interface DataSink { /** Get the {@link EventSinkProvider} for writing changed data to external systems. */ EventSinkProvider getEventSinkProvider(); /** Get the {@link MetadataApplier} for applying metadata changes to external systems. */ MetadataApplier getMetadataApplier(); default HashFunctionProvider getHashFunctionProvider() { return new DefaultHashFunctionProvider(); } } {code} and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. {code:java} private HashFunction recreateHashFunction(TableId tableId) { return hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); } {code} was: The {{PrePartitionOperator}} in its current implementation only supports a fixed {{HashFunction}} ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). This limits the ability of Sink implementations to customize the partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned tables, it would be advantageous to allow hashing based on partition keys, hashing according to table names, or using the database engine's internal primary key hash functions (such as with MaxCompute DataSink). When users require such custom partitioning logic, they are compelled to implement their PartitionOperator, which undermines the utility of {{{}PrePartitionOperator{}}}. To address this limitation, it would be highly desirable to enable the {{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible solution could involve a mechanism analogous to the {{DataSink}} interface, allowing the specification of a {{HashFunctionFactory}} class path in the configuration file. This enhancement would greatly facilitate users in tailoring partition strategies to meet their specific application needs. Summary: Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink Customization (was: Allow Custom HashFunction in PrePartitionOperator for Flink Sink Customization) > Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink > Customization > -------------------------------------------------------------------------------------- > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Reporter: zhangdingxin > Priority: Major > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement their PartitionOperator, which undermines the utility of > {{{}PrePartitionOperator{}}}. > To address this limitation, it would be highly desirable to enable the > {{PrePartitionOperator}} to support user-specified custom > {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible > solution could involve a mechanism analogous to the {{DataSink}} interface, > allowing the specification of a {{HashFunctionProvider}} class path in the > configuration file. This enhancement would greatly facilitate users in > tailoring partition strategies to meet their specific application needs. > In this case, I want to create new class {{HashFunctionProvider}} and > {{{}HashFunction{}}}: > {code:java} > public interface HashFunctionProvider { > HashFunction getHashFunction(Schema schema); > } > public interface HashFunction extends Function<DataChangeEvent, Integer> { > Integer apply(DataChangeEvent event); > } {code} > add {{getHashFunctionProvider}} method to {{DataSink}} > > {code:java} > public interface DataSink { > /** Get the {@link EventSinkProvider} for writing changed data to > external systems. */ > EventSinkProvider getEventSinkProvider(); > /** Get the {@link MetadataApplier} for applying metadata changes to > external systems. */ > MetadataApplier getMetadataApplier(); > default HashFunctionProvider getHashFunctionProvider() { > return new DefaultHashFunctionProvider(); > } > } {code} > and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. > {code:java} > private HashFunction recreateHashFunction(TableId tableId) { > return > hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)