[
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek closed FLINK-7143.
-----------------------------------
Resolution: Fixed
> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
> Key: FLINK-7143
> URL: https://issues.apache.org/jira/browse/FLINK-7143
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Steven Zhen Wu
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> h3. Important Notice:
> Upgrading jobs from 1.2.x exhibits no known problems. Jobs from 1.3.0 and
> 1.3.1 with incorrect partition assignments cannot be automatically fixed by
> upgrading to Flink 1.3.2 via a savepoint, because the upgraded version would
> resume the wrong partition assignment from the savepoint. A workaround is to
> assign a different uuid to the Kafka source (so the offsets won't be resumed
> from the savepoint) and let it start from the latest offsets committed to
> Kafka instead. Note that this may violate exactly-once semantics and
> introduce some duplicates, because Kafka's committed offsets are not
> guaranteed to be 100% up date date with Flink's internal offset tracking. To
> maximize the alignment between the offsets in Kafka and those tracked by
> Flink, we suggest to abort the 1.3.x job via the "cancel with savepoint"
> command
> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#cancel-job-with-savepoint)
> during the upgrade process.
> h3. Original Issue Description
> While deploying Flink 1.3 release to hundreds of routing jobs, we found some
> issues with partition assignment for Kafka consumer. some partitions weren't
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3.
> {code}
> protected static void initializeSubscribedPartitionsToStartOffsets(...)
> {
> ...
> for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> if (i % numParallelSubtasks == indexOfThisSubtask) {
> if (startupMode !=
> StartupMode.SPECIFIC_OFFSETS) {
>
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
> startupMode.getStateSentinel());
> }
> ...
> }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if
> the {{kafkaTopicPartitions}} has different order among different subtasks,
> assignment is not stable cross subtasks and creates the assignment issue
> mentioned earlier.
> fix is also very simple, we should use partitionId to do the mod {{if
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks ==
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks
> that is independent of ordering in the array.
> marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)