[
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083960#comment-16083960
]
ASF GitHub Bot commented on FLINK-7143:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/4301
@StephanEwen thanks for the review. Your suggestion makes a lot of sense.
I've fixed this up as the following:
- Have a new method `KafkaTopicAssigner.assign(KafkaTopicPartition
partition, int numSubtasks)` that defines a strict contract, such that when
locally used to filter out partitions, the resulting distribution of the
partitions of a single topic are guaranteed to be:
1. Uniformly distributed across subtasks
2. Partitions are round-robin distributed (strictly CLOCKWISE w.r.t.
ascending subtask indices) by using the partition id as the offset from a
starting index determined using the topic name. The extra directional contract
makes this more stricter than what we had before, which we may be round-robin
assigning partitions counter-clockwise. This should make the actual assignment
scheme much more predictable as perceived by the user, since they just need to
know the start index of a specific topic to understand how the partitions of
the topic are distributed across subtasks. We could add some log that states
the start index of the topics it is consuming.
- Strengthen the tests in `KafkaConsumerPartitionAssignmentTest` to test
this contract. Uniform distribution was already tested in that suite of tests,
the change makes the tests also verify the "clockwise round-robin since some
start index" contract.
> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
> Key: FLINK-7143
> URL: https://issues.apache.org/jira/browse/FLINK-7143
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Steven Zhen Wu
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some
> issues with partition assignment for Kafka consumer. some partitions weren't
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3.
> {code}
> protected static void initializeSubscribedPartitionsToStartOffsets(...)
> {
> ...
> for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> if (i % numParallelSubtasks == indexOfThisSubtask) {
> if (startupMode !=
> StartupMode.SPECIFIC_OFFSETS) {
>
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
> startupMode.getStateSentinel());
> }
> ...
> }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if
> the {{kafkaTopicPartitions}} has different order among different subtasks,
> assignment is not stable cross subtasks and creates the assignment issue
> mentioned earlier.
> fix is also very simple, we should use partitionId to do the mod {{if
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks ==
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks
> that is independent of ordering in the array.
> marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)