[ 
https://issues.apache.org/jira/browse/FLINK-39777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanquan Lv reassigned FLINK-39777:
----------------------------------

    Assignee: Mao Jiayi

> Support configurable HashFunction strategies in PrePartitionOperator
> --------------------------------------------------------------------
>
>                 Key: FLINK-39777
>                 URL: https://issues.apache.org/jira/browse/FLINK-39777
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>            Reporter: Yanquan Lv
>            Assignee: Mao Jiayi
>            Priority: Major
>
> h2. Background
> In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between 
> {{SchemaOperator}} and {{{}EventPartitioner{}}}. For every 
> {{{}DataChangeEvent{}}}, it computes a hash that decides which downstream 
> subtask the event is routed to.
> Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
>  * The default implementation {{DefaultDataChangeEventHashFunctionProvider}} 
> hashes by {{TableId + primary keys}} (see 
> {{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
>  * Sink connectors can ship their own implementation (e.g. 
> {{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}}, 
> {{{}FlussHashFunctionProvider{}}}).
> In real-world deployments, "hash by primary key" is not always the right 
> strategy:
>  # *All events of one table should land on the same subtask.* Some sinks want 
> strict per-table ordering (single-writer-per-table semantics, 
> single-partition writes, downstream parallelism organized by table). Hashing 
> by primary key spreads a single table across subtasks and can introduce 
> cross-subtask reordering.
>  # *No / changing primary key.* When a table has no primary key, or when a 
> schema change touches the PK set, hashing by primary key forces frequent 
> hash-function rebuilds and may not even produce a better distribution than 
> hashing by table.
>  # *Skew tuning.* Users want to switch strategies based on their data shape 
> (by table vs. by PK vs. by chosen columns) without writing and registering a 
> custom {{HashFunctionProvider}} per sink.
> Switching strategies today requires implementing a new 
> {{HashFunctionProvider}} and returning it from 
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. That is a high bar 
> for users and cannot be driven by pipeline-level configuration.
> h2. Current State
>  * {{RegularPrePartitionOperator}} takes a 
> {{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one 
> {{HashFunction}} instance per {{TableId}} at runtime 
> ({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
>  * The provider flows through the composer: 
> {{PartitioningTranslator#translateRegular}} receives it from upstream, and it 
> ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}} 
> ({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}}, 
> {{{}flink-cdc-common/.../sink/DataSink.java{}}}).
>  * Default behavior: hash by {{TableId + primary keys}} 
> ({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
>  * The same wiring exists in {{BatchRegularPrePartitionOperator}} and 
> {{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.
> *Pain points:*
>  * No out-of-the-box "hash by TableId" strategy.
>  * No pipeline-level configuration knob; users have to change code.
>  * Each sink connector reimplements its own provider, scattering the logic.
> h2. Proposed Design
> Introduce a "partitioning strategy" abstraction in the runtime/common layer 
> so that {{RegularPrePartitionOperator}} (and 
> {{{}BatchRegularPrePartitionOperator{}}}, 
> {{{}DistributedPrePartitionOperator{}}}) can select the appropriate 
> {{HashFunctionProvider}} from pipeline configuration.
> h3. 1. Introduce {{HashFunctionStrategy}} enum
> Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
> {code:java}
> public enum HashFunctionStrategy {
> /** Hash by TableId + primary keys (preserves today's default behavior). */
> PRIMARY_KEY,
> /** Hash by TableId only — all events of the same table land on the same 
> subtask. */
> TABLE_ID;
> } {code}
> The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but 
> this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
>  
> h3. 2. Provide a {{HashFunctionProvider}} per strategy
>  * {{{}PRIMARY_KEY{}}}: reuse the existing 
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}.
>  * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes 
> solely on {{TableId}} (namespace / schema / table) and never reads the record 
> payload.
> h3. 3. Expose a pipeline configuration option
> Add a {{pipeline.*}} option (see below) and read it in 
> {{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
>  * When the option is set explicitly, use the corresponding provider.
>  * When it is unset, fall back to 
> {{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is 
> fully preserved.
> h3. 4. Scope
> Apply to:
>  * {{RegularPrePartitionOperator}}
>  * {{BatchRegularPrePartitionOperator}}
>  * {{DistributedPrePartitionOperator}}
> The change is symmetric across the three; injection happens in 
> {{{}PartitioningTranslator{}}}.
> Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}} 
> is involved with cross-partition upsert and bucket modes. It continues to 
> pick its own provider and is *out of scope* for this issue.
> h2. Configuration
> Add a new pipeline option:
> |Key|Type|Default|Description|
> |{{partitioning.hash-function-strategy}}|enum|_(unset — falls back to the 
> sink-provided provider)_|Partitioning strategy. Allowed values: 
> {{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the 
> {{HashFunctionProvider}} returned by the sink.|
> Behavior:
>  * {*}Unset{*}: behavior is identical to today — 
> {{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
>  * {{{}PRIMARY_KEY{}}}: forces 
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any 
> sink-provided provider.
>  * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any 
> sink-provided provider.
>  * {*}Invalid value{*}: composer-level validation fails fast at job 
> submission with a clear error message.
> YAML example:
>  
> {code:java}
> pipeline:
>   name: my-cdc-job
>   partitioning.hash-function-strategy: TABLE_ID {code}
>  
> h2. Compatibility
>  * {*}Configuration{*}: the option is additive and optional. When unset, 
> behavior is byte-for-byte identical to today.
>  * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or 
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
> {{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
>  * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of 
> any checkpoint. Switching strategies requires a job restart but does *not* 
> require state cleanup.
>  * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and 
> {{TABLE_ID}} (in either direction) changes where events for a given key land. 
> Switching mid-flight on a running job can cause transient cross-subtask 
> reordering for the same key. This must be documented as a "restart from a 
> clean checkpoint" change.
>  * {*}Override semantics for sink-supplied providers{*}: when the user sets 
> the option explicitly, the sink's own provider is bypassed. Documentation 
> must call this out (e.g. for Paimon's runtime path, overriding may degrade 
> bucket-routing optimality).
> h2. Acceptance Criteria
> h3. Functional
>  * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and 
> {{{}TABLE_ID{}}}.
>  * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}} 
> only and is stable for the same {{{}TableId{}}}.
>  * {{PartitioningTranslator}} / composer wiring picks a provider based on 
> {{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when 
> unset.
>  * The change applies to {{{}RegularPrePartitionOperator{}}}, 
> {{{}BatchRegularPrePartitionOperator{}}}, and 
> {{{}DistributedPrePartitionOperator{}}}.
> h3. Configuration
>  * New {{partitioning.hash-function-strategy}} option, configurable via YAML 
> and threaded through composer to translator.
>  * Invalid values produce a clear error during job startup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to