[ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082538#comment-16082538 ]
ASF GitHub Bot commented on FLINK-7143: --------------------------------------- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4301 [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer This PR changes the mod operation for partition assignment from `i % numTasks == subtaskIndex` to `partition.hashCode % numTasks == subtaskIndex`. The bug was initially caused by #3378, when moving away from sorting the partition list. Apparently, the tests for partition assignment was not strict enough and did not catch this. This PR additionally adds verifications that the partitions end up in the expected subtasks, and that different partition ordering will still have the same partition assignments. Note: a fix is not required for the `master` branch, since the partition discovery changes already indirectly fixed the issue. However, test coverage for deterministic assignment should likewise be improved in `master` as well. A separate PR will be opened for that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7143-1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4301.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4301 ---- commit 563f605d00f5d184fce2eb505c59033f22d3d0ab Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2017-07-11T17:03:01Z [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer ---- > Partition assignment for Kafka consumer is not stable > ----------------------------------------------------- > > Key: FLINK-7143 > URL: https://issues.apache.org/jira/browse/FLINK-7143 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Steven Zhen Wu > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.2 > > > while deploying Flink 1.3 release to hundreds of routing jobs, we found some > issues with partition assignment for Kafka consumer. some partitions weren't > assigned and some partitions got assigned more than once. > Here is the bug introduced in Flink 1.3. > {code} > protected static void initializeSubscribedPartitionsToStartOffsets(...) > { > ... > for (int i = 0; i < kafkaTopicPartitions.size(); i++) { > if (i % numParallelSubtasks == indexOfThisSubtask) { > if (startupMode != > StartupMode.SPECIFIC_OFFSETS) { > > subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), > startupMode.getStateSentinel()); > } > ... > } > {code} > The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if > the {{kafkaTopicPartitions}} has different order among different subtasks, > assignment is not stable cross subtasks and creates the assignment issue > mentioned earlier. > fix is also very simple, we should use partitionId to do the mod {{if > (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == > indexOfThisSubtask)}}. That would result in stable assignment cross subtasks > that is independent of ordering in the array. > marking it as blocker because of its impact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)