Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5108
I don't quite understand this bug and fix. Any failure/recovery in Flink
will basically result in a loss of the cache. That means this does not give you
any added guarantees here - only stickyness between recoveries.
I would follow @aljoscha's advise and check what Kafka's own behavior for
this is. The fact that Flink switches the partition when Kafka adds more topics
may also be considered correct. Making this stateful and sticky to its
first-ever partitioning scheme means also that when scaling Kafka out, one
needs to savepoint/restore the Flink job and tell it to not pick up previous
state. Quite tricky as well.
Is this an actual user-reported problem?
If this code is added, it needs to be performance optimized:
- map.contains() followed by map.get() is an antipattern (can we guard
for that via spotbugs?) - two hash map accesses. Just do map.get() and check
for null, that is only one hash map access.
- Because a very common case will be having one topic, the code should be
optimized for one topic, meaning memoization if the latest value/access to save
the hashmap lookup when the method is always invoked with the same string
object.
---