I think these two Jira issues are relevant here: - https://issues.apache.org/jira/browse/FLINK-10342 <https://issues.apache.org/jira/browse/FLINK-10342> - https://issues.apache.org/jira/browse/FLINK-9303 <https://issues.apache.org/jira/browse/FLINK-9303>
The second one only because it’s slightly related. The first one is actually exactly this thread. I was against changing this behaviour in the Jira but I can now see that this is quite likely an issue. Aljoscha > On 13. Feb 2019, at 18:55, Gyula Fóra <gyula.f...@gmail.com> wrote: > > Hi! > > I agree that it’s very confusing if you explicitly specify the topics that > are to be confusing and what happens is different. > > I would almost consider this to be a bug , can’t see any reasonable use > case just hard to debug problems . > > Having an option would be a good start but I would rather treat this as a > bug. > > Gyula > > On Wed, 13 Feb 2019 at 18:27, Feng LI <nemoking...@gmail.com> wrote: > >> Hello there, >> >> I’m just wondering if there are real world use cases for maintaining this >> default behavior. It’s a bit counter intuitive and sometimes results in >> serious production issues. ( We had a similar issue when changing the topic >> name, and resulting reading every message twice - both from the old one and >> from the new). >> >> Cheers, >> Feng >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <tzuli...@apache.org> a >> écrit : >> >>> Hi, >>> >>> Partition offsets stored in state will always be respected when the >>> consumer is restored from checkpoints / savepoints. >>> AFAIK, this seems to have been the behaviour for quite some time now >> (since >>> FlinkKafkaConsumer08). >>> >>> I think in the past there were some discussion to at least allow some way >>> to ignore restored partition offsets. >>> One way to enable this is to filter the restored partition offsets based >> on >>> the configured list of specified topics / topic regex pattern in the >>> current execution. This should work, since this can only be modified when >>> restoring from savepoints (i.e. manual restores). >>> To avoid breaking the current behaviour, we can maybe add a >>> `filterRestoredPartitionOffsetState()` configuration on the consumer, >> which >>> by default is disabled to match the current behaviour. >>> >>> What do you think? >>> >>> Cheers, >>> Gordon >>> >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <gyula.f...@gmail.com> >> wrote: >>> >>>> Hi! >>>> >>>> I have run into a weird issue which I could have sworn that it wouldnt >>>> happen :D >>>> I feel there was a discussion about this in the past but maybe im >> wrong, >>>> but I hope someone can point me to a ticket. >>>> >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you >> take a >>>> savepoint and deploy a new version that only consumes (t1). >>>> >>>> The restore logic now still starts to consume (t1,t2,t3) which feels >> very >>>> unintuitive as those were explicitly removed from the list. It is also >>> hard >>>> to debug as the topics causing the problem are not defined anywhere in >>> your >>>> job, configs etc. >>>> >>>> Has anyone run into this issue? Should we change this default behaviour >>> or >>>> at least have an option to not do this? >>>> >>>> Cheers, >>>> Gyula >>>> >>> >>