[ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35237:
---------------------------------
    Description: 
The {{PrePartitionOperator}} in its current implementation only supports a 
fixed {{HashFunction}} 
({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
 This limits the ability of Sink implementations to customize the partitioning 
logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned 
tables, it would be advantageous to allow hashing based on partition keys, 
hashing according to table names, or using the database engine's internal 
primary key hash functions (such as with MaxCompute DataSink).

When users require such custom partitioning logic, they are compelled to 
implement their PartitionOperator, which undermines the utility of 
{{{}PrePartitionOperator{}}}.

To address this limitation, it would be highly desirable to enable the 
{{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s 
(Function<DataChangeEvent, Integer>). A possible solution could involve a 
mechanism analogous to the {{DataSink}} interface, allowing the specification 
of a {{HashFunctionProvider}} class path in the configuration file. This 
enhancement would greatly facilitate users in tailoring partition strategies to 
meet their specific application needs.

In this case, I want to create new class {{HashFunctionProvider}} and 
{{{}HashFunction{}}}:
{code:java}
public interface HashFunctionProvider {
    HashFunction getHashFunction(Schema schema);
}

public interface HashFunction extends Function<DataChangeEvent, Integer> {
    Integer apply(DataChangeEvent event);
} {code}
add {{getHashFunctionProvider}} method to {{DataSink}}

 
{code:java}
public interface DataSink {

    /** Get the {@link EventSinkProvider} for writing changed data to external 
systems. */
    EventSinkProvider getEventSinkProvider();

    /** Get the {@link MetadataApplier} for applying metadata changes to 
external systems. */
    MetadataApplier getMetadataApplier();

    default HashFunctionProvider getHashFunctionProvider() {
        return new DefaultHashFunctionProvider();
    }
} {code}
and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
{code:java}
private HashFunction recreateHashFunction(TableId tableId) {
    return 
hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
} {code}
 

  was:
The {{PrePartitionOperator}} in its current implementation only supports a 
fixed {{HashFunction}} 
({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
 This limits the ability of Sink implementations to customize the partitioning 
logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned 
tables, it would be advantageous to allow hashing based on partition keys, 
hashing according to table names, or using the database engine's internal 
primary key hash functions (such as with MaxCompute DataSink).

When users require such custom partitioning logic, they are compelled to 
implement their PartitionOperator, which undermines the utility of 
{{{}PrePartitionOperator{}}}.

To address this limitation, it would be highly desirable to enable the 
{{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s 
(Function<DataChangeEvent, Integer>). A possible solution could involve a 
mechanism analogous to the {{DataSink}} interface, allowing the specification 
of a {{HashFunctionFactory}} class path in the configuration file. This 
enhancement would greatly facilitate users in tailoring partition strategies to 
meet their specific application needs.

        Summary: Allow Sink to Choose HashFunction in PrePartitionOperator for 
Flink Sink Customization  (was: Allow Custom HashFunction in 
PrePartitionOperator for Flink Sink Customization)

> Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink 
> Customization
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-35237
>                 URL: https://issues.apache.org/jira/browse/FLINK-35237
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: zhangdingxin
>            Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
>     HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function<DataChangeEvent, Integer> {
>     Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
>     /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
>     EventSinkProvider getEventSinkProvider();
>     /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
>     MetadataApplier getMetadataApplier();
>     default HashFunctionProvider getHashFunctionProvider() {
>         return new DefaultHashFunctionProvider();
>     }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
>     return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to