[
https://issues.apache.org/jira/browse/FLINK-39777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yanquan Lv reassigned FLINK-39777:
----------------------------------
Assignee: Mao Jiayi
> Support configurable HashFunction strategies in PrePartitionOperator
> --------------------------------------------------------------------
>
> Key: FLINK-39777
> URL: https://issues.apache.org/jira/browse/FLINK-39777
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Reporter: Yanquan Lv
> Assignee: Mao Jiayi
> Priority: Major
>
> h2. Background
> In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between
> {{SchemaOperator}} and {{{}EventPartitioner{}}}. For every
> {{{}DataChangeEvent{}}}, it computes a hash that decides which downstream
> subtask the event is routed to.
> Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
> * The default implementation {{DefaultDataChangeEventHashFunctionProvider}}
> hashes by {{TableId + primary keys}} (see
> {{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
> * Sink connectors can ship their own implementation (e.g.
> {{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}},
> {{{}FlussHashFunctionProvider{}}}).
> In real-world deployments, "hash by primary key" is not always the right
> strategy:
> # *All events of one table should land on the same subtask.* Some sinks want
> strict per-table ordering (single-writer-per-table semantics,
> single-partition writes, downstream parallelism organized by table). Hashing
> by primary key spreads a single table across subtasks and can introduce
> cross-subtask reordering.
> # *No / changing primary key.* When a table has no primary key, or when a
> schema change touches the PK set, hashing by primary key forces frequent
> hash-function rebuilds and may not even produce a better distribution than
> hashing by table.
> # *Skew tuning.* Users want to switch strategies based on their data shape
> (by table vs. by PK vs. by chosen columns) without writing and registering a
> custom {{HashFunctionProvider}} per sink.
> Switching strategies today requires implementing a new
> {{HashFunctionProvider}} and returning it from
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. That is a high bar
> for users and cannot be driven by pipeline-level configuration.
> h2. Current State
> * {{RegularPrePartitionOperator}} takes a
> {{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one
> {{HashFunction}} instance per {{TableId}} at runtime
> ({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
> * The provider flows through the composer:
> {{PartitioningTranslator#translateRegular}} receives it from upstream, and it
> ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}}
> ({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}},
> {{{}flink-cdc-common/.../sink/DataSink.java{}}}).
> * Default behavior: hash by {{TableId + primary keys}}
> ({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
> * The same wiring exists in {{BatchRegularPrePartitionOperator}} and
> {{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.
> *Pain points:*
> * No out-of-the-box "hash by TableId" strategy.
> * No pipeline-level configuration knob; users have to change code.
> * Each sink connector reimplements its own provider, scattering the logic.
> h2. Proposed Design
> Introduce a "partitioning strategy" abstraction in the runtime/common layer
> so that {{RegularPrePartitionOperator}} (and
> {{{}BatchRegularPrePartitionOperator{}}},
> {{{}DistributedPrePartitionOperator{}}}) can select the appropriate
> {{HashFunctionProvider}} from pipeline configuration.
> h3. 1. Introduce {{HashFunctionStrategy}} enum
> Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
> {code:java}
> public enum HashFunctionStrategy {
> /** Hash by TableId + primary keys (preserves today's default behavior). */
> PRIMARY_KEY,
> /** Hash by TableId only — all events of the same table land on the same
> subtask. */
> TABLE_ID;
> } {code}
> The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but
> this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
>
> h3. 2. Provide a {{HashFunctionProvider}} per strategy
> * {{{}PRIMARY_KEY{}}}: reuse the existing
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}.
> * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes
> solely on {{TableId}} (namespace / schema / table) and never reads the record
> payload.
> h3. 3. Expose a pipeline configuration option
> Add a {{pipeline.*}} option (see below) and read it in
> {{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
> * When the option is set explicitly, use the corresponding provider.
> * When it is unset, fall back to
> {{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is
> fully preserved.
> h3. 4. Scope
> Apply to:
> * {{RegularPrePartitionOperator}}
> * {{BatchRegularPrePartitionOperator}}
> * {{DistributedPrePartitionOperator}}
> The change is symmetric across the three; injection happens in
> {{{}PartitioningTranslator{}}}.
> Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}}
> is involved with cross-partition upsert and bucket modes. It continues to
> pick its own provider and is *out of scope* for this issue.
> h2. Configuration
> Add a new pipeline option:
> |Key|Type|Default|Description|
> |{{partitioning.hash-function-strategy}}|enum|_(unset — falls back to the
> sink-provided provider)_|Partitioning strategy. Allowed values:
> {{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the
> {{HashFunctionProvider}} returned by the sink.|
> Behavior:
> * {*}Unset{*}: behavior is identical to today —
> {{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
> * {{{}PRIMARY_KEY{}}}: forces
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any
> sink-provided provider.
> * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any
> sink-provided provider.
> * {*}Invalid value{*}: composer-level validation fails fast at job
> submission with a clear error message.
> YAML example:
>
> {code:java}
> pipeline:
> name: my-cdc-job
> partitioning.hash-function-strategy: TABLE_ID {code}
>
> h2. Compatibility
> * {*}Configuration{*}: the option is additive and optional. When unset,
> behavior is byte-for-byte identical to today.
> * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}.
> {{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
> * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of
> any checkpoint. Switching strategies requires a job restart but does *not*
> require state cleanup.
> * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and
> {{TABLE_ID}} (in either direction) changes where events for a given key land.
> Switching mid-flight on a running job can cause transient cross-subtask
> reordering for the same key. This must be documented as a "restart from a
> clean checkpoint" change.
> * {*}Override semantics for sink-supplied providers{*}: when the user sets
> the option explicitly, the sink's own provider is bypassed. Documentation
> must call this out (e.g. for Paimon's runtime path, overriding may degrade
> bucket-routing optimality).
> h2. Acceptance Criteria
> h3. Functional
> * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and
> {{{}TABLE_ID{}}}.
> * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}}
> only and is stable for the same {{{}TableId{}}}.
> * {{PartitioningTranslator}} / composer wiring picks a provider based on
> {{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when
> unset.
> * The change applies to {{{}RegularPrePartitionOperator{}}},
> {{{}BatchRegularPrePartitionOperator{}}}, and
> {{{}DistributedPrePartitionOperator{}}}.
> h3. Configuration
> * New {{partitioning.hash-function-strategy}} option, configurable via YAML
> and threaded through composer to translator.
> * Invalid values produce a clear error during job startup.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)