[
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
鲨鱼辣椒 updated FLINK-39321:
-------------------------
Description:
It seems that this issue also exists in other newer versions; feel free to test
it if you are interested.
h2. Bug Description
When using Flink CDC to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
# Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
dynamic-bucket.initial-buckets=1
# Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
# Start the Flink CDC job to write data to the Paimon table
# Observe the exception in the task manager logs
h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing
Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
PaimonHashFunction is responsible for calculating which subtask each record
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
Options options, TableId tableId, Schema
schema, ZoneId zoneId, int parallelism) {
this.parallelism = parallelism;
Catalog catalog = FlinkCatalogFactory.
createPaimonCatalog(options);
FileStoreTable table = (FileStoreTable)
catalog.getTable(Identifier.fromString
(tableId.toString()));
if (table instanceof AppendOnlyFileStore)
{ this.fieldGetters = null; channelComputer = null; }
else {
this.fieldGetters = PaimonWriterHelper.
createFieldGetters(schema, zoneId);
channelComputer = new
RowAssignerChannelComputer(table.schema
(), parallelism);
channelComputer.setup(parallelism);
}
}
@Override
public int hashcode(DataChangeEvent event) {
if (channelComputer != null)
{ GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event,
fieldGetters); return channelComputer.channel (genericRow); }
else {
return ThreadLocalRandom.current().
nextInt(parallelism);
}
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
RowAssignerChannelComputer is responsible for computing the assigner ID for
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
this.numAssigners = MathUtils.min
(numAssigners, numChannels);
this.extractor = new RowPartitionKeyExtractor
(schema);
}
@Override
public int channel(InternalRow record) {
int partitionHash = extractor.partition
(record).hashCode();
int keyHash = extractor.trimmedPrimaryKey
(record).hashCode();
return computeAssigner(partitionHash,
keyHash, numChannels, numAssigners);
}\{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
* numChannels = 4 (from setup(parallelism) )
* numAssigners = MathUtils.min(1, 4) = 1
* recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
numAssigners=1)
* recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to
subtask 0.
This is correct behavior : When numAssigners=1 , all data should route to
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
Location :
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
BucketAssignOperator is responsible for creating HashBucketAssigner instances
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception {
super.open();
int numAssigners = table.coreOptions().
dynamicBucketInitialBuckets();
int totalTasksNumber = getRuntimeContext().
getNumberOfParallelSubtasks();
int currentTaskNumber = getRuntimeContext().
getIndexOfThisSubtask();
int minAssigners = MathUtils.min
(numAssigners, totalTasksNumber);
assigner = new HashBucketAssigner(
minAssigners,
totalTasksNumber, // numChannels
currentTaskNumber // assignId
);
}\{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
* numAssigners = 1
* totalTasksNumber = 4
* currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
* minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
* Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
* Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
* Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
* Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
h3. Step 4: HashBucketAssigner Validates recordAssignId
Location : org.apache.paimon.index.HashBucketAssigner
HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash)
{
int partitionHash = partition.hashCode();
int recordAssignId = computeAssignId
(partitionHash, hash);
checkArgument(
recordAssignId == assignId,
"This is a bug, record assign id %s
should equal to assign id %s.",
recordAssignId,
assignId);
}
private int computeAssignId(int partitionHash,
int hash) {
return computeAssigner(partitionHash, hash,
numChannels, numAssigners);
}\{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
* recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
numAssigners=1) = 0
* Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
* Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
* Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
* Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
h3. The Conflict
The core conflict is between two components :
# PaimonHashFunction (via RowAssignerChannelComputer) :
* Calculates recordAssignId = 0 for all records when numAssigners=1
* This is correct: all data should route to assigner 0
# BucketAssignOperator (creates HashBucketAssigner) :
* Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
* Expects each subtask to receive records with matching recordAssignId
* But all records have recordAssignId=0 , so only subtask 0 passes validation
The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
instances with different assignId values for each subtask, causing validation
failures.
h2. Solution
Modify BucketAssignOperator.java to align with the design principle that when
numAssigners=1 , all data should route to assigner 0:
{{}}
was:
h2. Bug Description
When using Flink CDC to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
# Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
dynamic-bucket.initial-buckets=1
# Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
# Start the Flink CDC job to write data to the Paimon table
# Observe the exception in the task manager logs
h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing
Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
PaimonHashFunction is responsible for calculating which subtask each record
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
Options options, TableId tableId, Schema
schema, ZoneId zoneId, int parallelism) \{
this.parallelism = parallelism;
Catalog catalog = FlinkCatalogFactory.
createPaimonCatalog(options);
FileStoreTable table = (FileStoreTable)
catalog.getTable(Identifier.fromString
(tableId.toString()));
if (table instanceof AppendOnlyFileStore) {
this.fieldGetters = null;
channelComputer = null;
} else \{
this.fieldGetters = PaimonWriterHelper.
createFieldGetters(schema, zoneId);
channelComputer = new
RowAssignerChannelComputer(table.schema
(), parallelism);
channelComputer.setup(parallelism);
}
}
@Override
public int hashcode(DataChangeEvent event) \{
if (channelComputer != null) {
GenericRow genericRow =
PaimonWriterHelper.
convertEventToGenericRow(event,
fieldGetters);
return channelComputer.channel
(genericRow);
} else \{
return ThreadLocalRandom.current().
nextInt(parallelism);
}
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
RowAssignerChannelComputer is responsible for computing the assigner ID for
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) \{
this.numChannels = numChannels;
this.numAssigners = MathUtils.min
(numAssigners, numChannels);
this.extractor = new RowPartitionKeyExtractor
(schema);
}
@Override
public int channel(InternalRow record) \{
int partitionHash = extractor.partition
(record).hashCode();
int keyHash = extractor.trimmedPrimaryKey
(record).hashCode();
return computeAssigner(partitionHash,
keyHash, numChannels, numAssigners);
}{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
* numChannels = 4 (from setup(parallelism) )
* numAssigners = MathUtils.min(1, 4) = 1
* recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
numAssigners=1)
* recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to
subtask 0.
This is correct behavior : When numAssigners=1 , all data should route to
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
Location :
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
BucketAssignOperator is responsible for creating HashBucketAssigner instances
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception \{
super.open();
int numAssigners = table.coreOptions().
dynamicBucketInitialBuckets();
int totalTasksNumber = getRuntimeContext().
getNumberOfParallelSubtasks();
int currentTaskNumber = getRuntimeContext().
getIndexOfThisSubtask();
int minAssigners = MathUtils.min
(numAssigners, totalTasksNumber);
assigner = new HashBucketAssigner(
minAssigners,
totalTasksNumber, // numChannels
currentTaskNumber // assignId
);
}{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
* numAssigners = 1
* totalTasksNumber = 4
* currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
* minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
* Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
* Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
* Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
* Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
h3. Step 4: HashBucketAssigner Validates recordAssignId
Location : org.apache.paimon.index.HashBucketAssigner
HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash)
\{
int partitionHash = partition.hashCode();
int recordAssignId = computeAssignId
(partitionHash, hash);
checkArgument(
recordAssignId == assignId,
"This is a bug, record assign id %s
should equal to assign id %s.",
recordAssignId,
assignId);
}
private int computeAssignId(int partitionHash,
int hash) \{
return computeAssigner(partitionHash, hash,
numChannels, numAssigners);
}{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
* recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
numAssigners=1) = 0
* Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
* Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
* Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
* Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
h3. The Conflict
The core conflict is between two components :
# PaimonHashFunction (via RowAssignerChannelComputer) :
* Calculates recordAssignId = 0 for all records when numAssigners=1
* This is correct: all data should route to assigner 0
# BucketAssignOperator (creates HashBucketAssigner) :
* Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
* Expects each subtask to receive records with matching recordAssignId
* But all records have recordAssignId=0 , so only subtask 0 passes validation
The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
instances with different assignId values for each subtask, causing validation
failures.
h2. Solution
Modify BucketAssignOperator.java to align with the design principle that when
numAssigners=1 , all data should route to assigner 0:
{{}}
> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID
> error.
> -------------------------------------------------------------------------------
>
> Key: FLINK-39321
> URL: https://issues.apache.org/jira/browse/FLINK-39321
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: 鲨鱼辣椒
> Assignee: 鲨鱼辣椒
> Priority: Major
>
> It seems that this issue also exists in other newer versions; feel free to
> test it if you are interested.
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
> equal to assign id 0.
> at
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
> # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
> dynamic-bucket.initial-buckets=1
> # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
> # Start the Flink CDC job to write data to the Paimon table
> # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> // PaimonHashFunction.java
> public PaimonHashFunction(
> Options options, TableId tableId, Schema
> schema, ZoneId zoneId, int parallelism) {
> this.parallelism = parallelism;
> Catalog catalog = FlinkCatalogFactory.
> createPaimonCatalog(options);
> FileStoreTable table = (FileStoreTable)
> catalog.getTable(Identifier.fromString
> (tableId.toString()));
> if (table instanceof AppendOnlyFileStore)
> { this.fieldGetters = null; channelComputer = null; }
> else {
> this.fieldGetters = PaimonWriterHelper.
> createFieldGetters(schema, zoneId);
> channelComputer = new
> RowAssignerChannelComputer(table.schema
> (), parallelism);
> channelComputer.setup(parallelism);
> }
> }
> @Override
> public int hashcode(DataChangeEvent event) {
> if (channelComputer != null)
> { GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event,
> fieldGetters); return channelComputer.channel (genericRow); }
> else {
> return ThreadLocalRandom.current().
> nextInt(parallelism);
> }
> }
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for
> each record:
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels) {
> this.numChannels = numChannels;
> this.numAssigners = MathUtils.min
> (numAssigners, numChannels);
> this.extractor = new RowPartitionKeyExtractor
> (schema);
> }
> @Override
> public int channel(InternalRow record) {
> int partitionHash = extractor.partition
> (record).hashCode();
> int keyHash = extractor.trimmedPrimaryKey
> (record).hashCode();
> return computeAssigner(partitionHash,
> keyHash, numChannels, numAssigners);
> }\{{}}
> Calculation (when initial-buckets=1 and parallelism=4 ):
> * numChannels = 4 (from setup(parallelism) )
> * numAssigners = MathUtils.min(1, 4) = 1
> * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
> numAssigners=1)
> * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location :
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances
> for each subtask:
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception {
> super.open();
> int numAssigners = table.coreOptions().
> dynamicBucketInitialBuckets();
> int totalTasksNumber = getRuntimeContext().
> getNumberOfParallelSubtasks();
> int currentTaskNumber = getRuntimeContext().
> getIndexOfThisSubtask();
> int minAssigners = MathUtils.min
> (numAssigners, totalTasksNumber);
> assigner = new HashBucketAssigner(
> minAssigners,
> totalTasksNumber, // numChannels
> currentTaskNumber // assignId
> );
> }\{{}}
> Parameters (when initial-buckets=1 and parallelism=4 ):
> * numAssigners = 1
> * totalTasksNumber = 4
> * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
> * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
> * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
> * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
> * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
> * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash)
> {
> int partitionHash = partition.hashCode();
> int recordAssignId = computeAssignId
> (partitionHash, hash);
> checkArgument(
> recordAssignId == assignId,
> "This is a bug, record assign id %s
> should equal to assign id %s.",
> recordAssignId,
> assignId);
> }
> private int computeAssignId(int partitionHash,
> int hash) {
> return computeAssigner(partitionHash, hash,
> numChannels, numAssigners);
> }\{{}}
> Validation (when initial-buckets=1 and parallelism=4 ):
> * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
> numAssigners=1) = 0
> * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
> * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
> * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
> * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
> # PaimonHashFunction (via RowAssignerChannelComputer) :
> * Calculates recordAssignId = 0 for all records when numAssigners=1
> * This is correct: all data should route to assigner 0
> # BucketAssignOperator (creates HashBucketAssigner) :
> * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
> * Expects each subtask to receive records with matching recordAssignId
> * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
> instances with different assignId values for each subtask, causing validation
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when
> numAssigners=1 , all data should route to assigner 0:
> {{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)