Hi, Partition offsets stored in state will always be respected when the consumer is restored from checkpoints / savepoints. AFAIK, this seems to have been the behaviour for quite some time now (since FlinkKafkaConsumer08).
I think in the past there were some discussion to at least allow some way to ignore restored partition offsets. One way to enable this is to filter the restored partition offsets based on the configured list of specified topics / topic regex pattern in the current execution. This should work, since this can only be modified when restoring from savepoints (i.e. manual restores). To avoid breaking the current behaviour, we can maybe add a `filterRestoredPartitionOffsetState()` configuration on the consumer, which by default is disabled to match the current behaviour. What do you think? Cheers, Gordon On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi! > > I have run into a weird issue which I could have sworn that it wouldnt > happen :D > I feel there was a discussion about this in the past but maybe im wrong, > but I hope someone can point me to a ticket. > > Lets say you create a kafka consumer that consumes (t1,t2,t3), you take a > savepoint and deploy a new version that only consumes (t1). > > The restore logic now still starts to consume (t1,t2,t3) which feels very > unintuitive as those were explicitly removed from the list. It is also hard > to debug as the topics causing the problem are not defined anywhere in your > job, configs etc. > > Has anyone run into this issue? Should we change this default behaviour or > at least have an option to not do this? > > Cheers, > Gyula >