Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5108
  
    I don't quite understand this bug and fix. Any failure/recovery in Flink 
will basically result in a loss of the cache. That means this does not give you 
any added guarantees here - only stickyness between recoveries.
    
    I would follow @aljoscha's advise and check what Kafka's own behavior for 
this is. The fact that Flink switches the partition when Kafka adds more topics 
may also be considered correct. Making this stateful and sticky to its 
first-ever partitioning scheme means also that when scaling Kafka out, one 
needs to savepoint/restore the Flink job and tell it to not pick up previous 
state. Quite tricky as well.
    
    Is this an actual user-reported problem?
    
    If this code is added, it needs to be performance optimized:
      - map.contains() followed by map.get() is an antipattern (can we guard 
for that via spotbugs?) - two hash map accesses. Just do map.get() and check 
for null, that is only one hash map access. 
      - Because a very common case will be having one topic, the code should be 
optimized for one topic, meaning memoization if the latest value/access to save 
the hashmap lookup when the method is always invoked with the same string 
object.


---

Reply via email to