[
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083966#comment-16083966
]
ASF GitHub Bot commented on FLINK-7143:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4301#discussion_r126952350
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Utility for assigning Kafka partitions to consumer subtasks.
+ */
+public class KafkaTopicPartitionAssigner {
+
+ /**
+ * Returns the index of the target subtask that a specific Kafka
partition should be
+ * assigned to.
+ *
+ * <p>The resulting distribution of partitions of a single topic has
the following contract:
+ * <ul>
+ * <li>1. Uniformly distributed across subtasks</li>
+ * <li>2. Partitions are round-robin distributed (strictly
clockwise w.r.t. ascending
+ * subtask indices) by using the partition id as the offset from a
starting index
+ * (i.e., the index of the subtask which partition 0 of the topic
will be assigned to,
+ * determined using the topic name).</li>
+ * </ul>
+ *
+ * <p>The above contract is crucial and cannot be broken. Consumer
subtasks rely on this
+ * contract to locally filter out partitions that it should not
subscribe to, guaranteeing
+ * that all partitions of a single topic will always be assigned to
some subtask in a
+ * uniformly distributed manner.
+ *
+ * @param partition the Kafka partition
+ * @param numParallelSubtasks total number of parallel subtasks
+ *
+ * @return index of the target subtask that the Kafka partition should
be assigned to.
+ */
+ public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
+ int startIndex = Math.abs(partition.getTopic().hashCode() * 31
% numParallelSubtasks);
--- End diff --
Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is
slightly safe to do
```java
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks);
```
> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
> Key: FLINK-7143
> URL: https://issues.apache.org/jira/browse/FLINK-7143
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Steven Zhen Wu
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some
> issues with partition assignment for Kafka consumer. some partitions weren't
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3.
> {code}
> protected static void initializeSubscribedPartitionsToStartOffsets(...)
> {
> ...
> for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> if (i % numParallelSubtasks == indexOfThisSubtask) {
> if (startupMode !=
> StartupMode.SPECIFIC_OFFSETS) {
>
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
> startupMode.getStateSentinel());
> }
> ...
> }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if
> the {{kafkaTopicPartitions}} has different order among different subtasks,
> assignment is not stable cross subtasks and creates the assignment issue
> mentioned earlier.
> fix is also very simple, we should use partitionId to do the mod {{if
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks ==
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks
> that is independent of ordering in the array.
> marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)