[ 
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090113#comment-16090113
 ] 

ASF GitHub Bot commented on FLINK-7143:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Oye, this is more complicated than I thought. On `release-1.3` the 
assignment actually works if the Kafka brokers always return the partitions in 
the same order. The reason is that the assignment of partitions and the 
assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is 
aligned. This meant that it's not a problem when sources think that they are 
"fresh" (not restored from state) because they didn't get any state. If they 
tried to assign a partition to themselves this would also mean that they have 
the state for that (again, because partition assignment and operator state 
assignment are aligned). 
    
    This PR breaks the alignment because the `startIndex` is not necessarily 
`0`. However, this is not caught by any tests because the 
`StateAssignmentOperation` has an optimisation where it doesn't repartition 
operator state if the parallelism doesn't change. If we deactivate that 
optimisation by turning this line into `if (true)`: 
https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561
 the test in Kafka09ITCase will fail.
    
    The fix is to properly forward the information of whether we're restored in 
`initializeState()`, I did a commit for that: 
https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The 
problem is that it is not easy to change the tests to catch this bug. I think 
an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some 
> issues with partition assignment for Kafka consumer. some partitions weren't 
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
>       protected static void initializeSubscribedPartitionsToStartOffsets(...) 
> {
>                 ...
>               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
>                       if (i % numParallelSubtasks == indexOfThisSubtask) {
>                               if (startupMode != 
> StartupMode.SPECIFIC_OFFSETS) {
>                                       
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
> startupMode.getStateSentinel());
>                               }
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if 
> the {{kafkaTopicPartitions}} has different order among different subtasks, 
> assignment is not stable cross subtasks and creates the assignment issue 
> mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if 
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == 
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks 
> that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to