Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/4301
Oye, this is more complicated than I thought. On `release-1.3` the
assignment actually works if the Kafka brokers always return the partitions in
the same order. The reason is that the assignment of partitions and the
assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is
aligned. This meant that it's not a problem when sources think that they are
"fresh" (not restored from state) because they didn't get any state. If they
tried to assign a partition to themselves this would also mean that they have
the state for that (again, because partition assignment and operator state
assignment are aligned).
This PR breaks the alignment because the `startIndex` is not necessarily
`0`. However, this is not caught by any tests because the
`StateAssignmentOperation` has an optimisation where it doesn't repartition
operator state if the parallelism doesn't change. If we deactivate that
optimisation by turning this line into `if (true)`:
https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561
the test in Kafka09ITCase will fail.
The fix is to properly forward the information of whether we're restored in
`initializeState()`, I did a commit for that:
https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The
problem is that it is not easy to change the tests to catch this bug. I think
an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---