Thanks for the input Becket!

I reorganized this proposal into FLIP-288 [1].

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source

Best,
Qingsheng

On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com> wrote:

> Thanks for the proposal, Qingsheng.
>
> +1 to enable auto partition discovery by default. Just a reminder, we need
> a FLIP for this.
>
> A bit more background on this.
>
> Most of the Kafka users simply subscribe to a topic and let the consumer to
> automatically adapt to partition changes. So enabling auto partition
> discovery would align with that experience. The counter argument last time
> when I proposed to enable auto partition discovery was mainly due to the
> concern from the Flink users. There were arguments that sometimes users
> don't want the partition changes to get automatically picked up, but want
> to do this by restarting the job manually so they can avoid unnoticed
> changes in the jobs.
>
> Given that in the old Flink source, by default the auto partition discovery
> was disabled, and there are use cases from both sides, we simply kept the
> behavior unchanged. From the discussion we have here, it looks like
> enabling auto partition discovery is much preferred. So I think we should
> do it.
>
> I am not worried about the performance. The new Kafka source will only have
> the SplitEnumerator sending metadata requests when the feature is enabled.
> It is actually much cheaper than the old Kafka source where every
> subtask does that.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote:
>
> > +1 for this proposal and thanks Qingsheng for driving this.
> >
> > Considering the interval, we also set the value as 5min, equivalent to
> the
> > default value of metadata.max.age.ms.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Benchao Li <libenc...@apache.org>
> > Sent: Friday, January 13, 2023 23:06
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by default in
> > Kafka source
> >
> > +1, we've enabled this by default (10mins) in our production for years.
> >
> > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
> >
> > > +1 for the proposal that makes users' daily work easier and therefore
> > makes
> > > Flink more attractive.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <re...@apache.org>
> wrote:
> > >
> > > > Thanks everyone for joining the discussion!
> > > >
> > > > @Martijn:
> > > >
> > > > > All newly discovered partitions will be consumed from the earliest
> > > offset
> > > > possible.
> > > >
> > > > Thanks for the reminder! I checked the logic of KafkaSource and found
> > > that
> > > > new partitions will start from the offset initializer specified by
> the
> > > user
> > > > instead of the earliest. We need to correct this behavior to avoid
> > > dropping
> > > > messages from new partitions.
> > > >
> > > > > Job restarts from checkpoint
> > > >
> > > > I think the current logic guarantees the exactly-once semantic. New
> > > > partitions created after the checkpoint will be re-discovered again
> and
> > > > picked up by the source.
> > > >
> > > > @John:
> > > >
> > > > > If you want to be a little conservative with the default, 5 minutes
> > > might
> > > > be better than 30 seconds.
> > > >
> > > > Thanks for the suggestion! I tried to find the equivalent config in
> > Kafka
> > > > but missed it. It would be neat to align with the default value of "
> > > > metadata.max.age.ms".
> > > >
> > > > @Gabor:
> > > >
> > > > > removed partition handling is not yet added
> > > >
> > > > There was a detailed discussion about removing partitions [1] but it
> > > looks
> > > > like this is not an easy task considering the potential data loss and
> > > state
> > > > inconsistency. I'm afraid there's no clear plan on this one and maybe
> > we
> > > > could trigger a new discussion thread about how to correctly handle
> > > removed
> > > > partitions.
> > > >
> > > > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> > > >
> > > > Best regards,
> > > > Qingsheng
> > > >
> > > >
> > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
> > gabor.g.somo...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > +1 on the overall direction, it's an important feature.
> > > > >
> > > > > I've had a look on the latest master and looks like removed
> partition
> > > > > handling is not yet added but I think this is essential.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> > > > >
> > > > > If a partition all of a sudden disappears then it could lead to
> data
> > > > loss.
> > > > > Are you planning to add it?
> > > > > If yes then when?
> > > > >
> > > > > G
> > > > >
> > > > >
> > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Thanks for this proposal, Qingsheng!
> > > > > >
> > > > > > If you want to be a little conservative with the default, 5
> minutes
> > > > might
> > > > > > be better than 30 seconds.
> > > > > >
> > > > > > The equivalent config in Kafka seems to be metadata.max.age.ms (
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > > > > ),
> > > > > > which has a default value of 5 minutes.
> > > > > >
> > > > > > Other than that, I’m in favor. I agree, this should be on by
> > default.
> > > > > >
> > > > > > Thanks again,
> > > > > > John
> > > > > >
> > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > > > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > > > > > discovery would be very useful for kafka topic scale partitions
> > > > > > > scenarios.
> > > > > > >
> > > > > > > +1 for the change.
> > > > > > >
> > > > > > > CC: Becket
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Leonard
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com>
> wrote:
> > > > > > >>
> > > > > > >> +1 for the change. I think this is beneficial for users and is
> > > > > > compatible.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Jark
> > > > > > >>
> > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com>
> > wrote:
> > > > > > >>
> > > > > > >>>>
> > > > > > >>>> +1 for this idea, we have enabled kafka dynamic partition
> > > > discovery
> > > > > in
> > > > > > >>> all
> > > > > > >>>> jobs.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>

Reply via email to