[ 
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39321:
-----------------------------------
    Labels: pull-request-available  (was: )

> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID 
> error.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-39321
>                 URL: https://issues.apache.org/jira/browse/FLINK-39321
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: RuiFeng Zhao
>            Assignee: RuiFeng Zhao
>            Priority: Major
>              Labels: pull-request-available
>
> It seems that this issue also exists in other newer versions; feel free to 
> test it if you are interested.
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with 
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
> equal to assign id 0.
> at 
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
>  # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
> dynamic-bucket.initial-buckets=1
>  # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
>  # Start the Flink CDC job to write data to the Paimon table
>  # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record 
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> {code:java}
> // PaimonHashFunction.java
> public PaimonHashFunction(
> Options options, TableId tableId, Schema 
> schema, ZoneId zoneId, int parallelism) {
> this.parallelism = parallelism;
> Catalog catalog = FlinkCatalogFactory.
> createPaimonCatalog(options);
> FileStoreTable table = (FileStoreTable) 
> catalog.getTable(Identifier.fromString
> (tableId.toString()));
> if (table instanceof AppendOnlyFileStore)
> { this.fieldGetters = null; channelComputer = null; }
> else
> { this.fieldGetters = PaimonWriterHelper. createFieldGetters(schema, zoneId); 
> channelComputer = new RowAssignerChannelComputer(table.schema (), 
> parallelism); channelComputer.setup(parallelism); }
> }
> @Override
> public int hashcode(DataChangeEvent event) {
> if (channelComputer != null)
> { GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event, 
> fieldGetters); return channelComputer.channel (genericRow); }
> else
> { return ThreadLocalRandom.current(). nextInt(parallelism); }
> }
> {code}
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for 
> each record:
> {code:java}
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels)
> { this.numChannels = numChannels; this.numAssigners = MathUtils.min 
> (numAssigners, numChannels); this.extractor = new RowPartitionKeyExtractor 
> (schema); }
> @Override
> public int channel(InternalRow record)
> { int partitionHash = extractor.partition (record).hashCode(); int keyHash = 
> extractor.trimmedPrimaryKey (record).hashCode(); return 
> computeAssigner(partitionHash, keyHash, numChannels, numAssigners); }
> {code}
>  
> Calculation (when initial-buckets=1 and parallelism=4 ):
>  * numChannels = 4 (from setup(parallelism) )
>  * numAssigners = MathUtils.min(1, 4) = 1
>  * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
> numAssigners=1)
>  * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route 
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to 
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location : 
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances 
> for each subtask:
> {code:java}
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception
> { super.open(); int numAssigners = table.coreOptions(). 
> dynamicBucketInitialBuckets(); int totalTasksNumber = getRuntimeContext(). 
> getNumberOfParallelSubtasks(); int currentTaskNumber = getRuntimeContext(). 
> getIndexOfThisSubtask(); int minAssigners = MathUtils.min (numAssigners, 
> totalTasksNumber); assigner = new HashBucketAssigner( minAssigners, 
> totalTasksNumber, // numChannels currentTaskNumber // assignId ); }
> {{}}
> {code}
> Parameters (when initial-buckets=1 and parallelism=4 ):
>  * numAssigners = 1
>  * totalTasksNumber = 4
>  * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
>  * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
>  * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
>  * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
>  * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
>  * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> {code:java}
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash)
> { int partitionHash = partition.hashCode(); int recordAssignId = 
> computeAssignId (partitionHash, hash); checkArgument( recordAssignId == 
> assignId, "This is a bug, record assign id %s should equal to assign id %s.", 
> recordAssignId, assignId); }
> private int computeAssignId(int partitionHash, 
> int hash)
> { return computeAssigner(partitionHash, hash, numChannels, numAssigners); }
> {{}}
> {code}
> Validation (when initial-buckets=1 and parallelism=4 ):
>  * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
> numAssigners=1) = 0
>  * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
>  * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
>  * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
>  * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
>  # PaimonHashFunction (via RowAssignerChannelComputer) :
>  * Calculates recordAssignId = 0 for all records when numAssigners=1
>  * This is correct: all data should route to assigner 0
>  # BucketAssignOperator (creates HashBucketAssigner) :
>  * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
>  * Expects each subtask to receive records with matching recordAssignId
>  * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
> instances with different assignId values for each subtask, causing validation 
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when 
> numAssigners=1 , all data should route to assigner 0:
> {{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to