Yanquan Lv created FLINK-39777:
----------------------------------

             Summary: Support configurable HashFunction strategies in 
PrePartitionOperator
                 Key: FLINK-39777
                 URL: https://issues.apache.org/jira/browse/FLINK-39777
             Project: Flink
          Issue Type: New Feature
            Reporter: Yanquan Lv


h2. Background

In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between 
{{SchemaOperator}} and {{{}EventPartitioner{}}}. For every 
{{{}DataChangeEvent{}}}, it computes a hash that decides which downstream 
subtask the event is routed to.

Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
 * The default implementation {{DefaultDataChangeEventHashFunctionProvider}} 
hashes by {{TableId + primary keys}} (see 
{{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
 * Sink connectors can ship their own implementation (e.g. 
{{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}}, 
{{{}FlussHashFunctionProvider{}}}).

In real-world deployments, "hash by primary key" is not always the right 
strategy:
 # *All events of one table should land on the same subtask.* Some sinks want 
strict per-table ordering (single-writer-per-table semantics, single-partition 
writes, downstream parallelism organized by table). Hashing by primary key 
spreads a single table across subtasks and can introduce cross-subtask 
reordering.
 # *No / changing primary key.* When a table has no primary key, or when a 
schema change touches the PK set, hashing by primary key forces frequent 
hash-function rebuilds and may not even produce a better distribution than 
hashing by table.
 # *Skew tuning.* Users want to switch strategies based on their data shape (by 
table vs. by PK vs. by chosen columns) without writing and registering a custom 
{{HashFunctionProvider}} per sink.

Switching strategies today requires implementing a new {{HashFunctionProvider}} 
and returning it from {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
That is a high bar for users and cannot be driven by pipeline-level 
configuration.
h2. Current State
 * {{RegularPrePartitionOperator}} takes a 
{{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one 
{{HashFunction}} instance per {{TableId}} at runtime 
({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
 * The provider flows through the composer: 
{{PartitioningTranslator#translateRegular}} receives it from upstream, and it 
ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}} 
({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}}, 
{{{}flink-cdc-common/.../sink/DataSink.java{}}}).
 * Default behavior: hash by {{TableId + primary keys}} 
({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
 * The same wiring exists in {{BatchRegularPrePartitionOperator}} and 
{{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.

*Pain points:*
 * No out-of-the-box "hash by TableId" strategy.
 * No pipeline-level configuration knob; users have to change code.
 * Each sink connector reimplements its own provider, scattering the logic.

h2. Proposed Design

Introduce a "partitioning strategy" abstraction in the runtime/common layer so 
that {{RegularPrePartitionOperator}} (and 
{{{}BatchRegularPrePartitionOperator{}}}, 
{{{}DistributedPrePartitionOperator{}}}) can select the appropriate 
{{HashFunctionProvider}} from pipeline configuration.
h3. 1. Introduce {{HashFunctionStrategy}} enum

Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
public enum HashFunctionStrategy \{
    /** Hash by TableId + primary keys (preserves today's default behavior). */
    PRIMARY_KEY,

    /** Hash by TableId only — all events of the same table land on the same 
subtask. */
    TABLE_ID;
}
The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but 
this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
h3. 2. Provide a {{HashFunctionProvider}} per strategy
 * {{{}PRIMARY_KEY{}}}: reuse the existing 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}.
 * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes 
solely on {{TableId}} (namespace / schema / table) and never reads the record 
payload.

h3. 3. Expose a pipeline configuration option

Add a {{pipeline.*}} option (see below) and read it in 
{{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
 * When the option is set explicitly, use the corresponding provider.
 * When it is unset, fall back to 
{{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is 
fully preserved.

h3. 4. Scope

Apply to:
 * {{RegularPrePartitionOperator}}
 * {{BatchRegularPrePartitionOperator}}
 * {{DistributedPrePartitionOperator}}

The change is symmetric across the three; injection happens in 
{{{}PartitioningTranslator{}}}.
Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}} 
is involved with cross-partition upsert and bucket modes. It continues to pick 
its own provider and is *out of scope* for this issue.
h2. Configuration

Add a new pipeline option:
|Key|Type|Default|Description|
|{{pipeline.partitioning.hash-function-strategy}}|enum|_(unset — falls back to 
the sink-provided provider)_|Partitioning strategy. Allowed values: 
{{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the 
{{HashFunctionProvider}} returned by the sink.|

Behavior:
 * {*}Unset{*}: behavior is identical to today — 
{{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
 * {{{}PRIMARY_KEY{}}}: forces 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any sink-provided 
provider.
 * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any 
sink-provided provider.
 * {*}Invalid value{*}: composer-level validation fails fast at job submission 
with a clear error message.

YAML example:
pipeline:
  name: my-cdc-job
  partitioning:
    hash-function-strategy: TABLE_ID
h2. Compatibility
 * {*}Configuration{*}: the option is additive and optional. When unset, 
behavior is byte-for-byte identical to today.
 * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or 
{{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
{{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
 * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of any 
checkpoint. Switching strategies requires a job restart but does *not* require 
state cleanup.
 * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and 
{{TABLE_ID}} (in either direction) changes where events for a given key land. 
Switching mid-flight on a running job can cause transient cross-subtask 
reordering for the same key. This must be documented as a "restart from a clean 
checkpoint" change.
 * {*}Override semantics for sink-supplied providers{*}: when the user sets the 
option explicitly, the sink's own provider is bypassed. Documentation must call 
this out (e.g. for Paimon's runtime path, overriding may degrade bucket-routing 
optimality).

h2. Acceptance Criteria
h3. Functional
 * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
 * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}} 
only and is stable for the same {{{}TableId{}}}.
 * {{PartitioningTranslator}} / composer wiring picks a provider based on 
{{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when 
unset.
 * The change applies to {{{}RegularPrePartitionOperator{}}}, 
{{{}BatchRegularPrePartitionOperator{}}}, and 
{{{}DistributedPrePartitionOperator{}}}.

h3. Configuration
 * New {{pipeline.partitioning.hash-function-strategy}} option, configurable 
via YAML and threaded through composer to translator.
 * Invalid values produce a clear error during job startup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to