[ 
https://issues.apache.org/jira/browse/FLINK-39777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanquan Lv updated FLINK-39777:
-------------------------------
    Description: 
h2. Background

In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between 
{{SchemaOperator}} and {{{}EventPartitioner{}}}. For every 
{{{}DataChangeEvent{}}}, it computes a hash that decides which downstream 
subtask the event is routed to.

Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
 * The default implementation {{DefaultDataChangeEventHashFunctionProvider}} 
hashes by {{TableId + primary keys}} (see 
{{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
 * Sink connectors can ship their own implementation (e.g. 
{{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}}, 
{{{}FlussHashFunctionProvider{}}}).

In real-world deployments, "hash by primary key" is not always the right 
strategy:
 # *All events of one table should land on the same subtask.* Some sinks want 
strict per-table ordering (single-writer-per-table semantics, single-partition 
writes, downstream parallelism organized by table). Hashing by primary key 
spreads a single table across subtasks and can introduce cross-subtask 
reordering.
 # *No / changing primary key.* When a table has no primary key, or when a 
schema change touches the PK set, hashing by primary key forces frequent 
hash-function rebuilds and may not even produce a better distribution than 
hashing by table.
 # *Skew tuning.* Users want to switch strategies based on their data shape (by 
table vs. by PK vs. by chosen columns) without writing and registering a custom 
{{HashFunctionProvider}} per sink.

Switching strategies today requires implementing a new {{HashFunctionProvider}} 
and returning it from {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
That is a high bar for users and cannot be driven by pipeline-level 
configuration.
h2. Current State
 * {{RegularPrePartitionOperator}} takes a 
{{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one 
{{HashFunction}} instance per {{TableId}} at runtime 
({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
 * The provider flows through the composer: 
{{PartitioningTranslator#translateRegular}} receives it from upstream, and it 
ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}} 
({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}}, 
{{{}flink-cdc-common/.../sink/DataSink.java{}}}).
 * Default behavior: hash by {{TableId + primary keys}} 
({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
 * The same wiring exists in {{BatchRegularPrePartitionOperator}} and 
{{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.

*Pain points:*
 * No out-of-the-box "hash by TableId" strategy.
 * No pipeline-level configuration knob; users have to change code.
 * Each sink connector reimplements its own provider, scattering the logic.

h2. Proposed Design

Introduce a "partitioning strategy" abstraction in the runtime/common layer so 
that {{RegularPrePartitionOperator}} (and 
{{{}BatchRegularPrePartitionOperator{}}}, 
{{{}DistributedPrePartitionOperator{}}}) can select the appropriate 
{{HashFunctionProvider}} from pipeline configuration.
h3. 1. Introduce {{HashFunctionStrategy}} enum

Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
{code:java}
public enum HashFunctionStrategy {
/** Hash by TableId + primary keys (preserves today's default behavior). */
PRIMARY_KEY,
/** Hash by TableId only — all events of the same table land on the same 
subtask. */
TABLE_ID;
} {code}
The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but 
this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.

 
h3. 2. Provide a {{HashFunctionProvider}} per strategy
 * {{{}PRIMARY_KEY{}}}: reuse the existing 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}.
 * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes 
solely on {{TableId}} (namespace / schema / table) and never reads the record 
payload.

h3. 3. Expose a pipeline configuration option

Add a {{pipeline.*}} option (see below) and read it in 
{{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
 * When the option is set explicitly, use the corresponding provider.
 * When it is unset, fall back to 
{{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is 
fully preserved.

h3. 4. Scope

Apply to:
 * {{RegularPrePartitionOperator}}
 * {{BatchRegularPrePartitionOperator}}
 * {{DistributedPrePartitionOperator}}

The change is symmetric across the three; injection happens in 
{{{}PartitioningTranslator{}}}.
Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}} 
is involved with cross-partition upsert and bucket modes. It continues to pick 
its own provider and is *out of scope* for this issue.
h2. Configuration

Add a new pipeline option:
|Key|Type|Default|Description|
|{{partitioning.hash-function-strategy}}|enum|_(unset — falls back to the 
sink-provided provider)_|Partitioning strategy. Allowed values: 
{{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the 
{{HashFunctionProvider}} returned by the sink.|

Behavior:
 * {*}Unset{*}: behavior is identical to today — 
{{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
 * {{{}PRIMARY_KEY{}}}: forces 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any sink-provided 
provider.
 * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any 
sink-provided provider.
 * {*}Invalid value{*}: composer-level validation fails fast at job submission 
with a clear error message.

YAML example:

 
{code:java}
pipeline:
  name: my-cdc-job
  partitioning.hash-function-strategy: TABLE_ID {code}
 
h2. Compatibility
 * {*}Configuration{*}: the option is additive and optional. When unset, 
behavior is byte-for-byte identical to today.
 * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or 
{{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
{{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
 * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of any 
checkpoint. Switching strategies requires a job restart but does *not* require 
state cleanup.
 * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and 
{{TABLE_ID}} (in either direction) changes where events for a given key land. 
Switching mid-flight on a running job can cause transient cross-subtask 
reordering for the same key. This must be documented as a "restart from a clean 
checkpoint" change.
 * {*}Override semantics for sink-supplied providers{*}: when the user sets the 
option explicitly, the sink's own provider is bypassed. Documentation must call 
this out (e.g. for Paimon's runtime path, overriding may degrade bucket-routing 
optimality).

h2. Acceptance Criteria
h3. Functional
 * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
 * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}} 
only and is stable for the same {{{}TableId{}}}.
 * {{PartitioningTranslator}} / composer wiring picks a provider based on 
{{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when 
unset.
 * The change applies to {{{}RegularPrePartitionOperator{}}}, 
{{{}BatchRegularPrePartitionOperator{}}}, and 
{{{}DistributedPrePartitionOperator{}}}.

h3. Configuration
 * New {{partitioning.hash-function-strategy}} option, configurable via YAML 
and threaded through composer to translator.
 * Invalid values produce a clear error during job startup.

  was:
h2. Background

In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between 
{{SchemaOperator}} and {{{}EventPartitioner{}}}. For every 
{{{}DataChangeEvent{}}}, it computes a hash that decides which downstream 
subtask the event is routed to.

Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
 * The default implementation {{DefaultDataChangeEventHashFunctionProvider}} 
hashes by {{TableId + primary keys}} (see 
{{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
 * Sink connectors can ship their own implementation (e.g. 
{{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}}, 
{{{}FlussHashFunctionProvider{}}}).

In real-world deployments, "hash by primary key" is not always the right 
strategy:
 # *All events of one table should land on the same subtask.* Some sinks want 
strict per-table ordering (single-writer-per-table semantics, single-partition 
writes, downstream parallelism organized by table). Hashing by primary key 
spreads a single table across subtasks and can introduce cross-subtask 
reordering.
 # *No / changing primary key.* When a table has no primary key, or when a 
schema change touches the PK set, hashing by primary key forces frequent 
hash-function rebuilds and may not even produce a better distribution than 
hashing by table.
 # *Skew tuning.* Users want to switch strategies based on their data shape (by 
table vs. by PK vs. by chosen columns) without writing and registering a custom 
{{HashFunctionProvider}} per sink.

Switching strategies today requires implementing a new {{HashFunctionProvider}} 
and returning it from {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
That is a high bar for users and cannot be driven by pipeline-level 
configuration.
h2. Current State
 * {{RegularPrePartitionOperator}} takes a 
{{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one 
{{HashFunction}} instance per {{TableId}} at runtime 
({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
 * The provider flows through the composer: 
{{PartitioningTranslator#translateRegular}} receives it from upstream, and it 
ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}} 
({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}}, 
{{{}flink-cdc-common/.../sink/DataSink.java{}}}).
 * Default behavior: hash by {{TableId + primary keys}} 
({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
 * The same wiring exists in {{BatchRegularPrePartitionOperator}} and 
{{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.

*Pain points:*
 * No out-of-the-box "hash by TableId" strategy.
 * No pipeline-level configuration knob; users have to change code.
 * Each sink connector reimplements its own provider, scattering the logic.

h2. Proposed Design

Introduce a "partitioning strategy" abstraction in the runtime/common layer so 
that {{RegularPrePartitionOperator}} (and 
{{{}BatchRegularPrePartitionOperator{}}}, 
{{{}DistributedPrePartitionOperator{}}}) can select the appropriate 
{{HashFunctionProvider}} from pipeline configuration.
h3. 1. Introduce {{HashFunctionStrategy}} enum

Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
public enum HashFunctionStrategy \{
    /** Hash by TableId + primary keys (preserves today's default behavior). */
    PRIMARY_KEY,

    /** Hash by TableId only — all events of the same table land on the same 
subtask. */
    TABLE_ID;
}
The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but 
this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
h3. 2. Provide a {{HashFunctionProvider}} per strategy
 * {{{}PRIMARY_KEY{}}}: reuse the existing 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}.
 * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes 
solely on {{TableId}} (namespace / schema / table) and never reads the record 
payload.

h3. 3. Expose a pipeline configuration option

Add a {{pipeline.*}} option (see below) and read it in 
{{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
 * When the option is set explicitly, use the corresponding provider.
 * When it is unset, fall back to 
{{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is 
fully preserved.

h3. 4. Scope

Apply to:
 * {{RegularPrePartitionOperator}}
 * {{BatchRegularPrePartitionOperator}}
 * {{DistributedPrePartitionOperator}}

The change is symmetric across the three; injection happens in 
{{{}PartitioningTranslator{}}}.
Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}} 
is involved with cross-partition upsert and bucket modes. It continues to pick 
its own provider and is *out of scope* for this issue.
h2. Configuration

Add a new pipeline option:
|Key|Type|Default|Description|
|{{pipeline.partitioning.hash-function-strategy}}|enum|_(unset — falls back to 
the sink-provided provider)_|Partitioning strategy. Allowed values: 
{{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the 
{{HashFunctionProvider}} returned by the sink.|

Behavior:
 * {*}Unset{*}: behavior is identical to today — 
{{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
 * {{{}PRIMARY_KEY{}}}: forces 
{{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any sink-provided 
provider.
 * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any 
sink-provided provider.
 * {*}Invalid value{*}: composer-level validation fails fast at job submission 
with a clear error message.

YAML example:
pipeline:
  name: my-cdc-job
  partitioning:
    hash-function-strategy: TABLE_ID
h2. Compatibility
 * {*}Configuration{*}: the option is additive and optional. When unset, 
behavior is byte-for-byte identical to today.
 * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or 
{{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
{{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
 * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of any 
checkpoint. Switching strategies requires a job restart but does *not* require 
state cleanup.
 * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and 
{{TABLE_ID}} (in either direction) changes where events for a given key land. 
Switching mid-flight on a running job can cause transient cross-subtask 
reordering for the same key. This must be documented as a "restart from a clean 
checkpoint" change.
 * {*}Override semantics for sink-supplied providers{*}: when the user sets the 
option explicitly, the sink's own provider is bypassed. Documentation must call 
this out (e.g. for Paimon's runtime path, overriding may degrade bucket-routing 
optimality).

h2. Acceptance Criteria
h3. Functional
 * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
 * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}} 
only and is stable for the same {{{}TableId{}}}.
 * {{PartitioningTranslator}} / composer wiring picks a provider based on 
{{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when 
unset.
 * The change applies to {{{}RegularPrePartitionOperator{}}}, 
{{{}BatchRegularPrePartitionOperator{}}}, and 
{{{}DistributedPrePartitionOperator{}}}.

h3. Configuration
 * New {{pipeline.partitioning.hash-function-strategy}} option, configurable 
via YAML and threaded through composer to translator.
 * Invalid values produce a clear error during job startup.


> Support configurable HashFunction strategies in PrePartitionOperator
> --------------------------------------------------------------------
>
>                 Key: FLINK-39777
>                 URL: https://issues.apache.org/jira/browse/FLINK-39777
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>            Reporter: Yanquan Lv
>            Priority: Major
>
> h2. Background
> In a Flink CDC pipeline, {{RegularPrePartitionOperator}} sits between 
> {{SchemaOperator}} and {{{}EventPartitioner{}}}. For every 
> {{{}DataChangeEvent{}}}, it computes a hash that decides which downstream 
> subtask the event is routed to.
> Today, the hashing behavior is fully owned by {{{}HashFunctionProvider{}}}:
>  * The default implementation {{DefaultDataChangeEventHashFunctionProvider}} 
> hashes by {{TableId + primary keys}} (see 
> {{{}flink-cdc-common/.../sink/DefaultDataChangeEventHashFunctionProvider.java{}}}).
>  * Sink connectors can ship their own implementation (e.g. 
> {{{}PaimonHashFunctionProvider{}}}, {{{}MaxComputeHashFunctionProvider{}}}, 
> {{{}FlussHashFunctionProvider{}}}).
> In real-world deployments, "hash by primary key" is not always the right 
> strategy:
>  # *All events of one table should land on the same subtask.* Some sinks want 
> strict per-table ordering (single-writer-per-table semantics, 
> single-partition writes, downstream parallelism organized by table). Hashing 
> by primary key spreads a single table across subtasks and can introduce 
> cross-subtask reordering.
>  # *No / changing primary key.* When a table has no primary key, or when a 
> schema change touches the PK set, hashing by primary key forces frequent 
> hash-function rebuilds and may not even produce a better distribution than 
> hashing by table.
>  # *Skew tuning.* Users want to switch strategies based on their data shape 
> (by table vs. by PK vs. by chosen columns) without writing and registering a 
> custom {{HashFunctionProvider}} per sink.
> Switching strategies today requires implementing a new 
> {{HashFunctionProvider}} and returning it from 
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. That is a high bar 
> for users and cannot be driven by pipeline-level configuration.
> h2. Current State
>  * {{RegularPrePartitionOperator}} takes a 
> {{HashFunctionProvider<DataChangeEvent>}} via its constructor and caches one 
> {{HashFunction}} instance per {{TableId}} at runtime 
> ({{{}flink-cdc-runtime/.../partitioning/RegularPrePartitionOperator.java{}}}).
>  * The provider flows through the composer: 
> {{PartitioningTranslator#translateRegular}} receives it from upstream, and it 
> ultimately comes from {{DataSink#getDataChangeEventHashFunctionProvider()}} 
> ({{{}flink-cdc-composer/.../translator/PartitioningTranslator.java{}}}, 
> {{{}flink-cdc-common/.../sink/DataSink.java{}}}).
>  * Default behavior: hash by {{TableId + primary keys}} 
> ({{{}DefaultDataChangeEventHashFunctionProvider{}}}).
>  * The same wiring exists in {{BatchRegularPrePartitionOperator}} and 
> {{{}DistributedPrePartitionOperator{}}}, so the gap is identical there.
> *Pain points:*
>  * No out-of-the-box "hash by TableId" strategy.
>  * No pipeline-level configuration knob; users have to change code.
>  * Each sink connector reimplements its own provider, scattering the logic.
> h2. Proposed Design
> Introduce a "partitioning strategy" abstraction in the runtime/common layer 
> so that {{RegularPrePartitionOperator}} (and 
> {{{}BatchRegularPrePartitionOperator{}}}, 
> {{{}DistributedPrePartitionOperator{}}}) can select the appropriate 
> {{HashFunctionProvider}} from pipeline configuration.
> h3. 1. Introduce {{HashFunctionStrategy}} enum
> Add {{{}org.apache.flink.cdc.common.function.HashFunctionStrategy{}}}:
> {code:java}
> public enum HashFunctionStrategy {
> /** Hash by TableId + primary keys (preserves today's default behavior). */
> PRIMARY_KEY,
> /** Hash by TableId only — all events of the same table land on the same 
> subtask. */
> TABLE_ID;
> } {code}
> The enum is designed to grow ({{{}ROUND_ROBIN{}}}, {{{}COLUMNS{}}}, ...), but 
> this issue ships only {{PRIMARY_KEY}} and {{{}TABLE_ID{}}}.
>  
> h3. 2. Provide a {{HashFunctionProvider}} per strategy
>  * {{{}PRIMARY_KEY{}}}: reuse the existing 
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}.
>  * {{{}TABLE_ID{}}}: add {{{}TableIdHashFunctionProvider{}}}, which hashes 
> solely on {{TableId}} (namespace / schema / table) and never reads the record 
> payload.
> h3. 3. Expose a pipeline configuration option
> Add a {{pipeline.*}} option (see below) and read it in 
> {{FlinkPipelineComposer}} / {{{}PartitioningTranslator{}}}:
>  * When the option is set explicitly, use the corresponding provider.
>  * When it is unset, fall back to 
> {{DataSink#getDataChangeEventHashFunctionProvider()}} so existing behavior is 
> fully preserved.
> h3. 4. Scope
> Apply to:
>  * {{RegularPrePartitionOperator}}
>  * {{BatchRegularPrePartitionOperator}}
>  * {{DistributedPrePartitionOperator}}
> The change is symmetric across the three; injection happens in 
> {{{}PartitioningTranslator{}}}.
> Note: the {{PrePartitionOperator}} in {{flink-cdc-pipeline-connector-paimon}} 
> is involved with cross-partition upsert and bucket modes. It continues to 
> pick its own provider and is *out of scope* for this issue.
> h2. Configuration
> Add a new pipeline option:
> |Key|Type|Default|Description|
> |{{partitioning.hash-function-strategy}}|enum|_(unset — falls back to the 
> sink-provided provider)_|Partitioning strategy. Allowed values: 
> {{{}PRIMARY_KEY{}}}, {{{}TABLE_ID{}}}. When set, this overrides the 
> {{HashFunctionProvider}} returned by the sink.|
> Behavior:
>  * {*}Unset{*}: behavior is identical to today — 
> {{DataSink#getDataChangeEventHashFunctionProvider()}} is used.
>  * {{{}PRIMARY_KEY{}}}: forces 
> {{{}DefaultDataChangeEventHashFunctionProvider{}}}, ignoring any 
> sink-provided provider.
>  * {{{}TABLE_ID{}}}: uses {{{}TableIdHashFunctionProvider{}}}, ignoring any 
> sink-provided provider.
>  * {*}Invalid value{*}: composer-level validation fails fast at job 
> submission with a clear error message.
> YAML example:
>  
> {code:java}
> pipeline:
>   name: my-cdc-job
>   partitioning.hash-function-strategy: TABLE_ID {code}
>  
> h2. Compatibility
>  * {*}Configuration{*}: the option is additive and optional. When unset, 
> behavior is byte-for-byte identical to today.
>  * {*}API{*}: no breaking changes to {{HashFunctionProvider}} or 
> {{{}DataSink#getDataChangeEventHashFunctionProvider{}}}. 
> {{HashFunctionStrategy}} and {{TableIdHashFunctionProvider}} are new classes.
>  * {*}State{*}: the {{HashFunction}} cache is {{transient}} and not part of 
> any checkpoint. Switching strategies requires a job restart but does *not* 
> require state cleanup.
>  * {*}Downstream consistency{*}: switching between {{PRIMARY_KEY}} and 
> {{TABLE_ID}} (in either direction) changes where events for a given key land. 
> Switching mid-flight on a running job can cause transient cross-subtask 
> reordering for the same key. This must be documented as a "restart from a 
> clean checkpoint" change.
>  * {*}Override semantics for sink-supplied providers{*}: when the user sets 
> the option explicitly, the sink's own provider is bypassed. Documentation 
> must call this out (e.g. for Paimon's runtime path, overriding may degrade 
> bucket-routing optimality).
> h2. Acceptance Criteria
> h3. Functional
>  * Add {{HashFunctionStrategy}} enum with {{PRIMARY_KEY}} and 
> {{{}TABLE_ID{}}}.
>  * Add {{{}TableIdHashFunctionProvider{}}}. Hash is derived from {{TableId}} 
> only and is stable for the same {{{}TableId{}}}.
>  * {{PartitioningTranslator}} / composer wiring picks a provider based on 
> {{{}HashFunctionStrategy{}}}; falls back to the sink-provided provider when 
> unset.
>  * The change applies to {{{}RegularPrePartitionOperator{}}}, 
> {{{}BatchRegularPrePartitionOperator{}}}, and 
> {{{}DistributedPrePartitionOperator{}}}.
> h3. Configuration
>  * New {{partitioning.hash-function-strategy}} option, configurable via YAML 
> and threaded through composer to translator.
>  * Invalid values produce a clear error during job startup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to