[ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35237:
---------------------------------
    Summary: Allow Sink to Choose HashFunction in PrePartitionOperator  (was: 
Allow Sink to Choose HashFunction in PrePartitionOperato)

> Allow Sink to Choose HashFunction in PrePartitionOperator
> ---------------------------------------------------------
>
>                 Key: FLINK-35237
>                 URL: https://issues.apache.org/jira/browse/FLINK-35237
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: zhangdingxin
>            Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
>     HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function<DataChangeEvent, Integer> {
>     Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
>     /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
>     EventSinkProvider getEventSinkProvider();
>     /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
>     MetadataApplier getMetadataApplier();
>     default HashFunctionProvider getHashFunctionProvider() {
>         return new DefaultHashFunctionProvider();
>     }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
>     return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to