[ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083966#comment-16083966 ]
ASF GitHub Bot commented on FLINK-7143: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4301#discussion_r126952350 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * Utility for assigning Kafka partitions to consumer subtasks. + */ +public class KafkaTopicPartitionAssigner { + + /** + * Returns the index of the target subtask that a specific Kafka partition should be + * assigned to. + * + * <p>The resulting distribution of partitions of a single topic has the following contract: + * <ul> + * <li>1. Uniformly distributed across subtasks</li> + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending + * subtask indices) by using the partition id as the offset from a starting index + * (i.e., the index of the subtask which partition 0 of the topic will be assigned to, + * determined using the topic name).</li> + * </ul> + * + * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this + * contract to locally filter out partitions that it should not subscribe to, guaranteeing + * that all partitions of a single topic will always be assigned to some subtask in a + * uniformly distributed manner. + * + * @param partition the Kafka partition + * @param numParallelSubtasks total number of parallel subtasks + * + * @return index of the target subtask that the Kafka partition should be assigned to. + */ + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { + int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks); --- End diff -- Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is slightly safe to do ```java int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks); ``` > Partition assignment for Kafka consumer is not stable > ----------------------------------------------------- > > Key: FLINK-7143 > URL: https://issues.apache.org/jira/browse/FLINK-7143 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Steven Zhen Wu > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.2 > > > while deploying Flink 1.3 release to hundreds of routing jobs, we found some > issues with partition assignment for Kafka consumer. some partitions weren't > assigned and some partitions got assigned more than once. > Here is the bug introduced in Flink 1.3. > {code} > protected static void initializeSubscribedPartitionsToStartOffsets(...) > { > ... > for (int i = 0; i < kafkaTopicPartitions.size(); i++) { > if (i % numParallelSubtasks == indexOfThisSubtask) { > if (startupMode != > StartupMode.SPECIFIC_OFFSETS) { > > subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), > startupMode.getStateSentinel()); > } > ... > } > {code} > The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if > the {{kafkaTopicPartitions}} has different order among different subtasks, > assignment is not stable cross subtasks and creates the assignment issue > mentioned earlier. > fix is also very simple, we should use partitionId to do the mod {{if > (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == > indexOfThisSubtask)}}. That would result in stable assignment cross subtasks > that is independent of ordering in the array. > marking it as blocker because of its impact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)