Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Oye, this is more complicated than I thought. On `release-1.3` the 
assignment actually works if the Kafka brokers always return the partitions in 
the same order. The reason is that the assignment of partitions and the 
assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is 
aligned. This meant that it's not a problem when sources think that they are 
"fresh" (not restored from state) because they didn't get any state. If they 
tried to assign a partition to themselves this would also mean that they have 
the state for that (again, because partition assignment and operator state 
assignment are aligned). 
    
    This PR breaks the alignment because the `startIndex` is not necessarily 
`0`. However, this is not caught by any tests because the 
`StateAssignmentOperation` has an optimisation where it doesn't repartition 
operator state if the parallelism doesn't change. If we deactivate that 
optimisation by turning this line into `if (true)`: 
https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561
 the test in Kafka09ITCase will fail.
    
    The fix is to properly forward the information of whether we're restored in 
`initializeState()`, I did a commit for that: 
https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The 
problem is that it is not easy to change the tests to catch this bug. I think 
an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to