[
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu updated FLINK-35237:
-------------------------------
Affects Version/s: cdc-3.1.1
> Allow Sink to Choose HashFunction in PrePartitionOperator
> ---------------------------------------------------------
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Reporter: zhangdingxin
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> The {{PrePartitionOperator}} in its current implementation only supports a
> fixed {{HashFunction}}
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
> This limits the ability of Sink implementations to customize the
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of
> partitioned tables, it would be advantageous to allow hashing based on
> partition keys, hashing according to table names, or using the database
> engine's internal primary key hash functions (such as with MaxCompute
> DataSink).
> When users require such custom partitioning logic, they are compelled to
> implement their PartitionOperator, which undermines the utility of
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the
> {{PrePartitionOperator}} to support user-specified custom
> {{{}HashFunction{}}}s (Function<DataChangeEvent, Integer>). A possible
> solution could involve a mechanism analogous to the {{DataSink}} interface,
> allowing the specification of a {{HashFunctionProvider}} class path in the
> configuration file. This enhancement would greatly facilitate users in
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function<DataChangeEvent, Integer> {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)