Thanks everyone for joining the discussion! @Martijn:
> All newly discovered partitions will be consumed from the earliest offset possible. Thanks for the reminder! I checked the logic of KafkaSource and found that new partitions will start from the offset initializer specified by the user instead of the earliest. We need to correct this behavior to avoid dropping messages from new partitions. > Job restarts from checkpoint I think the current logic guarantees the exactly-once semantic. New partitions created after the checkpoint will be re-discovered again and picked up by the source. @John: > If you want to be a little conservative with the default, 5 minutes might be better than 30 seconds. Thanks for the suggestion! I tried to find the equivalent config in Kafka but missed it. It would be neat to align with the default value of " metadata.max.age.ms". @Gabor: > removed partition handling is not yet added There was a detailed discussion about removing partitions [1] but it looks like this is not an easy task considering the potential data loss and state inconsistency. I'm afraid there's no clear plan on this one and maybe we could trigger a new discussion thread about how to correctly handle removed partitions. [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt Best regards, Qingsheng On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > +1 on the overall direction, it's an important feature. > > I've had a look on the latest master and looks like removed partition > handling is not yet added but I think this is essential. > > > https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 > > If a partition all of a sudden disappears then it could lead to data loss. > Are you planning to add it? > If yes then when? > > G > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <vvcep...@apache.org> wrote: > > > Thanks for this proposal, Qingsheng! > > > > If you want to be a little conservative with the default, 5 minutes might > > be better than 30 seconds. > > > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > ), > > which has a default value of 5 minutes. > > > > Other than that, I’m in favor. I agree, this should be on by default. > > > > Thanks again, > > John > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > > Thanks Qingsheng for driving this, enable the dynamic partition > > > discovery would be very useful for kafka topic scale partitions > > > scenarios. > > > > > > +1 for the change. > > > > > > CC: Becket > > > > > > > > > Best, > > > Leonard > > > > > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com> wrote: > > >> > > >> +1 for the change. I think this is beneficial for users and is > > compatible. > > >> > > >> Best, > > >> Jark > > >> > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com> wrote: > > >> > > >>>> > > >>>> +1 for this idea, we have enabled kafka dynamic partition discovery > in > > >>> all > > >>>> jobs. > > >>>> > > >>>> > > >>> > > >