[ 
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083966#comment-16083966
 ] 

ASF GitHub Bot commented on FLINK-7143:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4301#discussion_r126952350
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
 ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * Utility for assigning Kafka partitions to consumer subtasks.
    + */
    +public class KafkaTopicPartitionAssigner {
    +
    +   /**
    +    * Returns the index of the target subtask that a specific Kafka 
partition should be
    +    * assigned to.
    +    *
    +    * <p>The resulting distribution of partitions of a single topic has 
the following contract:
    +    * <ul>
    +    *     <li>1. Uniformly distributed across subtasks</li>
    +    *     <li>2. Partitions are round-robin distributed (strictly 
clockwise w.r.t. ascending
    +    *     subtask indices) by using the partition id as the offset from a 
starting index
    +    *     (i.e., the index of the subtask which partition 0 of the topic 
will be assigned to,
    +    *     determined using the topic name).</li>
    +    * </ul>
    +    *
    +    * <p>The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
    +    * contract to locally filter out partitions that it should not 
subscribe to, guaranteeing
    +    * that all partitions of a single topic will always be assigned to 
some subtask in a
    +    * uniformly distributed manner.
    +    *
    +    * @param partition the Kafka partition
    +    * @param numParallelSubtasks total number of parallel subtasks
    +    *
    +    * @return index of the target subtask that the Kafka partition should 
be assigned to.
    +    */
    +   public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
    +           int startIndex = Math.abs(partition.getTopic().hashCode() * 31 
% numParallelSubtasks);
    --- End diff --
    
    Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is 
slightly safe to do
    ```java
    int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % 
numParallelSubtasks);
    ```


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some 
> issues with partition assignment for Kafka consumer. some partitions weren't 
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
>       protected static void initializeSubscribedPartitionsToStartOffsets(...) 
> {
>                 ...
>               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
>                       if (i % numParallelSubtasks == indexOfThisSubtask) {
>                               if (startupMode != 
> StartupMode.SPECIFIC_OFFSETS) {
>                                       
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
> startupMode.getStateSentinel());
>                               }
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if 
> the {{kafkaTopicPartitions}} has different order among different subtasks, 
> assignment is not stable cross subtasks and creates the assignment issue 
> mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if 
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == 
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks 
> that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to