[ 
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

鲨鱼辣椒 updated FLINK-39321:
-------------------------
    Description: 
It seems that this issue also exists in other newer versions; feel free to test 
it if you are interested.
h2. Bug Description

When using Flink CDC to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
 # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
dynamic-bucket.initial-buckets=1
 # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
 # Start the Flink CDC job to write data to the Paimon table
 # Observe the exception in the task manager logs

h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing

Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction

PaimonHashFunction is responsible for calculating which subtask each record 
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
Options options, TableId tableId, Schema 
schema, ZoneId zoneId, int parallelism) {
this.parallelism = parallelism;
Catalog catalog = FlinkCatalogFactory.
createPaimonCatalog(options);
FileStoreTable table = (FileStoreTable) 
catalog.getTable(Identifier.fromString
(tableId.toString()));

if (table instanceof AppendOnlyFileStore)

{ this.fieldGetters = null; channelComputer = null; }

else {
this.fieldGetters = PaimonWriterHelper.
createFieldGetters(schema, zoneId);
channelComputer = new 
RowAssignerChannelComputer(table.schema
(), parallelism);
channelComputer.setup(parallelism);
}
}

@Override
public int hashcode(DataChangeEvent event) {
if (channelComputer != null)

{ GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event, 
fieldGetters); return channelComputer.channel (genericRow); }

else {
return ThreadLocalRandom.current().
nextInt(parallelism);
}
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId

Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer

RowAssignerChannelComputer is responsible for computing the assigner ID for 
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
this.numAssigners = MathUtils.min
(numAssigners, numChannels);
this.extractor = new RowPartitionKeyExtractor
(schema);
}

@Override
public int channel(InternalRow record) {
int partitionHash = extractor.partition
(record).hashCode();
int keyHash = extractor.trimmedPrimaryKey
(record).hashCode();
return computeAssigner(partitionHash, 
keyHash, numChannels, numAssigners);
}\{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
 * numChannels = 4 (from setup(parallelism) )
 * numAssigners = MathUtils.min(1, 4) = 1
 * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
numAssigners=1)
 * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to 
subtask 0.

This is correct behavior : When numAssigners=1 , all data should route to 
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner

Location : 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator

BucketAssignOperator is responsible for creating HashBucketAssigner instances 
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception {
super.open();
int numAssigners = table.coreOptions().
dynamicBucketInitialBuckets();
int totalTasksNumber = getRuntimeContext().
getNumberOfParallelSubtasks();
int currentTaskNumber = getRuntimeContext().
getIndexOfThisSubtask();

int minAssigners = MathUtils.min
(numAssigners, totalTasksNumber);

assigner = new HashBucketAssigner(
minAssigners,
totalTasksNumber, // numChannels
currentTaskNumber // assignId
);
}\{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
 * numAssigners = 1
 * totalTasksNumber = 4
 * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
 * minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
 * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
 * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
 * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
 * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)

h3. Step 4: HashBucketAssigner Validates recordAssignId

Location : org.apache.paimon.index.HashBucketAssigner

HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash) 
{
int partitionHash = partition.hashCode();
int recordAssignId = computeAssignId
(partitionHash, hash);

checkArgument(
recordAssignId == assignId,
"This is a bug, record assign id %s 
should equal to assign id %s.",
recordAssignId,
assignId);
}
private int computeAssignId(int partitionHash, 
int hash) {
return computeAssigner(partitionHash, hash, 
numChannels, numAssigners);
}\{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
 * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
numAssigners=1) = 0
 * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
 * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
 * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
 * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS

h3. The Conflict

The core conflict is between two components :
 # PaimonHashFunction (via RowAssignerChannelComputer) :

 * Calculates recordAssignId = 0 for all records when numAssigners=1
 * This is correct: all data should route to assigner 0

 # BucketAssignOperator (creates HashBucketAssigner) :

 * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
 * Expects each subtask to receive records with matching recordAssignId
 * But all records have recordAssignId=0 , so only subtask 0 passes validation

The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
instances with different assignId values for each subtask, causing validation 
failures.
h2. Solution

Modify BucketAssignOperator.java to align with the design principle that when 
numAssigners=1 , all data should route to assigner 0:
{{}}

  was:
h2. Bug Description

When using Flink CDC to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
 # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
dynamic-bucket.initial-buckets=1
 # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
 # Start the Flink CDC job to write data to the Paimon table
 # Observe the exception in the task manager logs

h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing

Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction

PaimonHashFunction is responsible for calculating which subtask each record 
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
        Options options, TableId tableId, Schema 
        schema, ZoneId zoneId, int parallelism) \{
    this.parallelism = parallelism;
    Catalog catalog = FlinkCatalogFactory.
    createPaimonCatalog(options);
    FileStoreTable table = (FileStoreTable) 
    catalog.getTable(Identifier.fromString
    (tableId.toString()));
    
    if (table instanceof AppendOnlyFileStore) {
        this.fieldGetters = null;
        channelComputer = null;
    } else \{
        this.fieldGetters = PaimonWriterHelper.
        createFieldGetters(schema, zoneId);
        channelComputer = new 
        RowAssignerChannelComputer(table.schema
        (), parallelism);
        channelComputer.setup(parallelism);
    }
}

@Override
public int hashcode(DataChangeEvent event) \{
    if (channelComputer != null) {
        GenericRow genericRow = 
        PaimonWriterHelper.
        convertEventToGenericRow(event, 
        fieldGetters);
        return channelComputer.channel
        (genericRow);
    } else \{
        return ThreadLocalRandom.current().
        nextInt(parallelism);
    }
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId

Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer

RowAssignerChannelComputer is responsible for computing the assigner ID for 
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) \{
    this.numChannels = numChannels;
    this.numAssigners = MathUtils.min
    (numAssigners, numChannels);
    this.extractor = new RowPartitionKeyExtractor
    (schema);
}

@Override
public int channel(InternalRow record) \{
    int partitionHash = extractor.partition
    (record).hashCode();
    int keyHash = extractor.trimmedPrimaryKey
    (record).hashCode();
    return computeAssigner(partitionHash, 
    keyHash, numChannels, numAssigners);
}{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
 * numChannels = 4 (from setup(parallelism) )
 * numAssigners = MathUtils.min(1, 4) = 1
 * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
numAssigners=1)
 * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to 
subtask 0.

This is correct behavior : When numAssigners=1 , all data should route to 
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner

Location : 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator

BucketAssignOperator is responsible for creating HashBucketAssigner instances 
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception \{
    super.open();
    int numAssigners = table.coreOptions().
    dynamicBucketInitialBuckets();
    int totalTasksNumber = getRuntimeContext().
    getNumberOfParallelSubtasks();
    int currentTaskNumber = getRuntimeContext().
    getIndexOfThisSubtask();
    
    int minAssigners = MathUtils.min
    (numAssigners, totalTasksNumber);
    
    assigner = new HashBucketAssigner(
        minAssigners,
        totalTasksNumber,  // numChannels
        currentTaskNumber   // assignId
    );
}{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
 * numAssigners = 1
 * totalTasksNumber = 4
 * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
 * minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
 * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
 * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
 * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
 * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)

h3. Step 4: HashBucketAssigner Validates recordAssignId

Location : org.apache.paimon.index.HashBucketAssigner

HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash) 
\{
    int partitionHash = partition.hashCode();
    int recordAssignId = computeAssignId
    (partitionHash, hash);
    
    checkArgument(
        recordAssignId == assignId,
        "This is a bug, record assign id %s 
        should equal to assign id %s.",
        recordAssignId,
        assignId);
    }
private int computeAssignId(int partitionHash, 
int hash) \{
    return computeAssigner(partitionHash, hash, 
    numChannels, numAssigners);
}{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
 * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
numAssigners=1) = 0
 * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
 * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
 * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
 * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS

h3. The Conflict

The core conflict is between two components :
 # PaimonHashFunction (via RowAssignerChannelComputer) :

 * Calculates recordAssignId = 0 for all records when numAssigners=1
 * This is correct: all data should route to assigner 0

 # BucketAssignOperator (creates HashBucketAssigner) :

 * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
 * Expects each subtask to receive records with matching recordAssignId
 * But all records have recordAssignId=0 , so only subtask 0 passes validation

The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
instances with different assignId values for each subtask, causing validation 
failures.
h2. Solution

Modify BucketAssignOperator.java to align with the design principle that when 
numAssigners=1 , all data should route to assigner 0:
{{}}


> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID 
> error.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-39321
>                 URL: https://issues.apache.org/jira/browse/FLINK-39321
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: 鲨鱼辣椒
>            Assignee: 鲨鱼辣椒
>            Priority: Major
>
> It seems that this issue also exists in other newer versions; feel free to 
> test it if you are interested.
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with 
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
> equal to assign id 0.
> at 
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
>  # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
> dynamic-bucket.initial-buckets=1
>  # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
>  # Start the Flink CDC job to write data to the Paimon table
>  # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record 
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> // PaimonHashFunction.java
> public PaimonHashFunction(
> Options options, TableId tableId, Schema 
> schema, ZoneId zoneId, int parallelism) {
> this.parallelism = parallelism;
> Catalog catalog = FlinkCatalogFactory.
> createPaimonCatalog(options);
> FileStoreTable table = (FileStoreTable) 
> catalog.getTable(Identifier.fromString
> (tableId.toString()));
> if (table instanceof AppendOnlyFileStore)
> { this.fieldGetters = null; channelComputer = null; }
> else {
> this.fieldGetters = PaimonWriterHelper.
> createFieldGetters(schema, zoneId);
> channelComputer = new 
> RowAssignerChannelComputer(table.schema
> (), parallelism);
> channelComputer.setup(parallelism);
> }
> }
> @Override
> public int hashcode(DataChangeEvent event) {
> if (channelComputer != null)
> { GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event, 
> fieldGetters); return channelComputer.channel (genericRow); }
> else {
> return ThreadLocalRandom.current().
> nextInt(parallelism);
> }
> }
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for 
> each record:
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels) {
> this.numChannels = numChannels;
> this.numAssigners = MathUtils.min
> (numAssigners, numChannels);
> this.extractor = new RowPartitionKeyExtractor
> (schema);
> }
> @Override
> public int channel(InternalRow record) {
> int partitionHash = extractor.partition
> (record).hashCode();
> int keyHash = extractor.trimmedPrimaryKey
> (record).hashCode();
> return computeAssigner(partitionHash, 
> keyHash, numChannels, numAssigners);
> }\{{}}
> Calculation (when initial-buckets=1 and parallelism=4 ):
>  * numChannels = 4 (from setup(parallelism) )
>  * numAssigners = MathUtils.min(1, 4) = 1
>  * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
> numAssigners=1)
>  * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route 
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to 
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location : 
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances 
> for each subtask:
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception {
> super.open();
> int numAssigners = table.coreOptions().
> dynamicBucketInitialBuckets();
> int totalTasksNumber = getRuntimeContext().
> getNumberOfParallelSubtasks();
> int currentTaskNumber = getRuntimeContext().
> getIndexOfThisSubtask();
> int minAssigners = MathUtils.min
> (numAssigners, totalTasksNumber);
> assigner = new HashBucketAssigner(
> minAssigners,
> totalTasksNumber, // numChannels
> currentTaskNumber // assignId
> );
> }\{{}}
> Parameters (when initial-buckets=1 and parallelism=4 ):
>  * numAssigners = 1
>  * totalTasksNumber = 4
>  * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
>  * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
>  * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
>  * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
>  * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
>  * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash) 
> {
> int partitionHash = partition.hashCode();
> int recordAssignId = computeAssignId
> (partitionHash, hash);
> checkArgument(
> recordAssignId == assignId,
> "This is a bug, record assign id %s 
> should equal to assign id %s.",
> recordAssignId,
> assignId);
> }
> private int computeAssignId(int partitionHash, 
> int hash) {
> return computeAssigner(partitionHash, hash, 
> numChannels, numAssigners);
> }\{{}}
> Validation (when initial-buckets=1 and parallelism=4 ):
>  * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
> numAssigners=1) = 0
>  * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
>  * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
>  * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
>  * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
>  # PaimonHashFunction (via RowAssignerChannelComputer) :
>  * Calculates recordAssignId = 0 for all records when numAssigners=1
>  * This is correct: all data should route to assigner 0
>  # BucketAssignOperator (creates HashBucketAssigner) :
>  * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
>  * Expects each subtask to receive records with matching recordAssignId
>  * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
> instances with different assignId values for each subtask, causing validation 
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when 
> numAssigners=1 , all data should route to assigner 0:
> {{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to