[
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
鲨鱼辣椒 updated FLINK-39321:
-------------------------
Description:
*Problem Background*
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
*Problem Analysis*
When {{{}minAssigners=1{}}}: * PaimonHashFunction (via
RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash % 1
+ keyHash % 1) % numChannels = 0{}}}.
* BucketAssignOperator expects all data to be routed to assigner 0.
* However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}}, so
data is routed to all subtasks, such as 0, 1, 2, 3, while {{recordAssignId=0}}
for all records.
* This causes validation failures when data enters other subtasks.
{code:java}
// BucketAssigner.java static boolean isMyBucket(int bucket, int
numAssigners, int assignId) { return bucket % numAssigners == assignId %
numAssigners; }{code}
thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should
equal to assign id ?.{code}
I have attempted to fix this bug, and by comparison, the latest version appears
to still have this bug.
was:
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
exception is thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should
equal to assign id ?.{code}
I have attempted to fix this bug, and by comparison, the latest version appears
to still have this bug.
> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID
> error.
> -------------------------------------------------------------------------------
>
> Key: FLINK-39321
> URL: https://issues.apache.org/jira/browse/FLINK-39321
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: 鲨鱼辣椒
> Priority: Major
>
> *Problem Background*
> When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
> *Problem Analysis*
> When {{{}minAssigners=1{}}}: * PaimonHashFunction (via
> RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash %
> 1 + keyHash % 1) % numChannels = 0{}}}.
> * BucketAssignOperator expects all data to be routed to assigner 0.
> * However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}},
> so data is routed to all subtasks, such as 0, 1, 2, 3, while
> {{recordAssignId=0}} for all records.
> * This causes validation failures when data enters other subtasks.
>
> {code:java}
> // BucketAssigner.java static boolean isMyBucket(int bucket, int
> numAssigners, int assignId) { return bucket % numAssigners == assignId %
> numAssigners; }{code}
> thrown:
> {code:java}
> java.lang.IllegalArgumentException: This is a bug, record assign id ? should
> equal to assign id ?.{code}
> I have attempted to fix this bug, and by comparison, the latest version
> appears to still have this bug.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)