Yanquan Lv created FLINK-39777:
----------------------------------
Summary: Support configurable HashFunction strategies in
PrePartitionOperator
Key: FLINK-39777
URL: https://issues.apache.org/jira/browse/FLINK-39777
Project: Flink
Issue Type: New Feature
Reporter: Yanquan Lv
h2. Background
In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between
{{SchemaOperator}} and {{{}EventPartitioner{}}}. For every
{{{}DataChangeEvent{}}}, it computes a hash that decides which downstream
subtask the event is routed to.
Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
* The default implementation {{DefaultDataChangeEventHashFunctionProvider}}
hashes by {{TableId + primary keys}} (see
{{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
* Sink connectors can ship their own implementation (e.g.
{{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}},
{{{}FlussHashFunctionProvider{}}}).
In real-world deployments, "hash by primary key" is not always the right
strategy:
# *All events of one table should land on the same subtask.* Some sinks want
strict per-table ordering (single-writer-per-table semantics, single-partition
writes, downstream parallelism organized by table). Hashing by primary key
spreads a single table across subtasks and can introduce cross-subtask
reordering.
# *No / changing primary key.* When a table has no primary key, or when a
schema change touches the PK set, hashing by primary key forces frequent
hash-function rebuilds and may not even produce a better distribution than
hashing by table.
# *Skew tuning.* Users want to switch strategies based on their data shape (by
table vs. by PK vs. by chosen columns) without writing and registering a custom
{{HashFunctionProvider}} per sink.
Switching strategies today requires implementing a new {{HashFunctionProvider}}
and returning it from {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}.
That is a high bar for users and cannot be driven by pipeline-level
configuration.
h2. Current State
* {{RegularPrePartitionOperator}} takes a
{{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one
{{HashFunction}} instance per {{TableId}} at runtime
({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
* The provider flows through the composer:
{{PartitioningTranslator#translateRegular}} receives it from upstream, and it
ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}}
({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}},
{{{}flink-cdc-common/.../sink/DataSink.java{}}}).
* Default behavior: hash by {{TableId + primary keys}}
({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
* The same wiring exists in {{BatchRegularPrePartitionOperator}} and
{{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.
*Pain points:*
* No out-of-the-box "hash by TableId" strategy.
* No pipeline-level configuration knob; users have to change code.
* Each sink connector reimplements its own provider, scattering the logic.
h2. Proposed Design
Introduce a "partitioning strategy" abstraction in the runtime/common layer so
that {{RegularPrePartitionOperator}} (and
{{{}BatchRegularPrePartitionOperator{}}},
{{{}DistributedPrePartitionOperator{}}}) can select the appropriate
{{HashFunctionProvider}} from pipeline configuration.
h3. 1. Introduce {{HashFunctionStrategy}} enum
Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
public enum HashFunctionStrategy \{
/** Hash by TableId + primary keys (preserves today's default behavior). */
PRIMARY_KEY,
/** Hash by TableId only — all events of the same table land on the same
subtask. */
TABLE_ID;
}
The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but
this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
h3. 2. Provide a {{HashFunctionProvider}} per strategy
* {{{}PRIMARY_KEY{}}}: reuse the existing
{{{}DefaultDataChangeEventHashFunctionProvider{}}}.
* {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes
solely on {{TableId}} (namespace / schema / table) and never reads the record
payload.
h3. 3. Expose a pipeline configuration option
Add a {{pipeline.*}} option (see below) and read it in
{{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
* When the option is set explicitly, use the corresponding provider.
* When it is unset, fall back to
{{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is
fully preserved.
h3. 4. Scope
Apply to:
* {{RegularPrePartitionOperator}}
* {{BatchRegularPrePartitionOperator}}
* {{DistributedPrePartitionOperator}}
The change is symmetric across the three; injection happens in
{{{}PartitioningTranslator{}}}.
Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}}
is involved with cross-partition upsert and bucket modes. It continues to pick
its own provider and is *out of scope* for this issue.
h2. Configuration
Add a new pipeline option:
|Key|Type|Default|Description|
|{{pipeline.partitioning.hash-function-strategy}}|enum|_(unset — falls back to
the sink-provided provider)_|Partitioning strategy. Allowed values:
{{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the
{{HashFunctionProvider}} returned by the sink.|
Behavior:
* {*}Unset{*}: behavior is identical to today —
{{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
* {{{}PRIMARY_KEY{}}}: forces
{{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any sink-provided
provider.
* {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any
sink-provided provider.
* {*}Invalid value{*}: composer-level validation fails fast at job submission
with a clear error message.
YAML example:
pipeline:
name: my-cdc-job
partitioning:
hash-function-strategy: TABLE_ID
h2. Compatibility
* {*}Configuration{*}: the option is additive and optional. When unset,
behavior is byte-for-byte identical to today.
* {*}API{*}: no breaking changes to {{HashFunctionProvider}} or
{{{}DataSink#getDataChangeEventHashFunctionProvider{}}}.
{{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
* {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of any
checkpoint. Switching strategies requires a job restart but does *not* require
state cleanup.
* {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and
{{TABLE_ID}} (in either direction) changes where events for a given key land.
Switching mid-flight on a running job can cause transient cross-subtask
reordering for the same key. This must be documented as a "restart from a clean
checkpoint" change.
* {*}Override semantics for sink-supplied providers{*}: when the user sets the
option explicitly, the sink's own provider is bypassed. Documentation must call
this out (e.g. for Paimon's runtime path, overriding may degrade bucket-routing
optimality).
h2. Acceptance Criteria
h3. Functional
* Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
* Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}}
only and is stable for the same {{{}TableId{}}}.
* {{PartitioningTranslator}} / composer wiring picks a provider based on
{{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when
unset.
* The change applies to {{{}RegularPrePartitionOperator{}}},
{{{}BatchRegularPrePartitionOperator{}}}, and
{{{}DistributedPrePartitionOperator{}}}.
h3. Configuration
* New {{pipeline.partitioning.hash-function-strategy}} option, configurable
via YAML and threaded through composer to translator.
* Invalid values produce a clear error during job startup.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)