[
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39321:
-----------------------------------
Labels: pull-request-available (was: )
> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID
> error.
> -------------------------------------------------------------------------------
>
> Key: FLINK-39321
> URL: https://issues.apache.org/jira/browse/FLINK-39321
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: RuiFeng Zhao
> Assignee: RuiFeng Zhao
> Priority: Major
> Labels: pull-request-available
>
> It seems that this issue also exists in other newer versions; feel free to
> test it if you are interested.
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
> equal to assign id 0.
> at
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
> # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
> dynamic-bucket.initial-buckets=1
> # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
> # Start the Flink CDC job to write data to the Paimon table
> # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> {code:java}
> // PaimonHashFunction.java
> public PaimonHashFunction(
> Options options, TableId tableId, Schema
> schema, ZoneId zoneId, int parallelism) {
> this.parallelism = parallelism;
> Catalog catalog = FlinkCatalogFactory.
> createPaimonCatalog(options);
> FileStoreTable table = (FileStoreTable)
> catalog.getTable(Identifier.fromString
> (tableId.toString()));
> if (table instanceof AppendOnlyFileStore)
> { this.fieldGetters = null; channelComputer = null; }
> else
> { this.fieldGetters = PaimonWriterHelper. createFieldGetters(schema, zoneId);
> channelComputer = new RowAssignerChannelComputer(table.schema (),
> parallelism); channelComputer.setup(parallelism); }
> }
> @Override
> public int hashcode(DataChangeEvent event) {
> if (channelComputer != null)
> { GenericRow genericRow = PaimonWriterHelper. convertEventToGenericRow(event,
> fieldGetters); return channelComputer.channel (genericRow); }
> else
> { return ThreadLocalRandom.current(). nextInt(parallelism); }
> }
> {code}
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for
> each record:
> {code:java}
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels)
> { this.numChannels = numChannels; this.numAssigners = MathUtils.min
> (numAssigners, numChannels); this.extractor = new RowPartitionKeyExtractor
> (schema); }
> @Override
> public int channel(InternalRow record)
> { int partitionHash = extractor.partition (record).hashCode(); int keyHash =
> extractor.trimmedPrimaryKey (record).hashCode(); return
> computeAssigner(partitionHash, keyHash, numChannels, numAssigners); }
> {code}
>
> Calculation (when initial-buckets=1 and parallelism=4 ):
> * numChannels = 4 (from setup(parallelism) )
> * numAssigners = MathUtils.min(1, 4) = 1
> * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
> numAssigners=1)
> * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location :
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances
> for each subtask:
> {code:java}
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception
> { super.open(); int numAssigners = table.coreOptions().
> dynamicBucketInitialBuckets(); int totalTasksNumber = getRuntimeContext().
> getNumberOfParallelSubtasks(); int currentTaskNumber = getRuntimeContext().
> getIndexOfThisSubtask(); int minAssigners = MathUtils.min (numAssigners,
> totalTasksNumber); assigner = new HashBucketAssigner( minAssigners,
> totalTasksNumber, // numChannels currentTaskNumber // assignId ); }
> {{}}
> {code}
> Parameters (when initial-buckets=1 and parallelism=4 ):
> * numAssigners = 1
> * totalTasksNumber = 4
> * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
> * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
> * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
> * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
> * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
> * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> {code:java}
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash)
> { int partitionHash = partition.hashCode(); int recordAssignId =
> computeAssignId (partitionHash, hash); checkArgument( recordAssignId ==
> assignId, "This is a bug, record assign id %s should equal to assign id %s.",
> recordAssignId, assignId); }
> private int computeAssignId(int partitionHash,
> int hash)
> { return computeAssigner(partitionHash, hash, numChannels, numAssigners); }
> {{}}
> {code}
> Validation (when initial-buckets=1 and parallelism=4 ):
> * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
> numAssigners=1) = 0
> * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
> * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
> * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
> * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
> # PaimonHashFunction (via RowAssignerChannelComputer) :
> * Calculates recordAssignId = 0 for all records when numAssigners=1
> * This is correct: all data should route to assigner 0
> # BucketAssignOperator (creates HashBucketAssigner) :
> * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
> * Expects each subtask to receive records with matching recordAssignId
> * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
> instances with different assignId values for each subtask, causing validation
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when
> numAssigners=1 , all data should route to assigner 0:
> {{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)