Hi all,

Sorry to come into the discussion late--I saw the thread earlier.

I'm also +1 for the change in general. I think most users have this turned
on by default since the overhead is quite low. A default in the two digit
seconds range works well for us. However, I do have two main concerns that
are related, but don't necessarily block this FLIP:

1. Timestamp Offset Initializer

Currently, the timestamp offset initializer defaults the offset reset
strategy to LATEST. This can present some problems if the discovery
interval is set too large since records from new partitions could be
skipped (the set timestamp is not found in Kafka, thus resetting to the
latest). Here is a ticket to allow customizations:
https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you might
remember this from a PR review). Thanks for mentioning this in your FLIP!

2. AdminClient Fault Tolerance

AdminClient, which is used for partition discovery, seems not to handle
Kafka timeouts as robustly as the KafkaConsumer API, and we have noticed
that transient network hiccups cause full job restarts (since the
jobmanager fails) in numerous incidents. Internally, we have introduced an
error handling strategy based on the number of consecutive partition
discovery failures. I'm interested in opening a JIRA ticket to contribute
this feature back to Flink and open making the error handling more
pluggable. What do you think?

Best,
Mason

On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org> wrote:

> Thanks for the input Becket!
>
> I reorganized this proposal into FLIP-288 [1].
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
>
> Best,
> Qingsheng
>
> On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Thanks for the proposal, Qingsheng.
> >
> > +1 to enable auto partition discovery by default. Just a reminder, we
> need
> > a FLIP for this.
> >
> > A bit more background on this.
> >
> > Most of the Kafka users simply subscribe to a topic and let the consumer
> to
> > automatically adapt to partition changes. So enabling auto partition
> > discovery would align with that experience. The counter argument last
> time
> > when I proposed to enable auto partition discovery was mainly due to the
> > concern from the Flink users. There were arguments that sometimes users
> > don't want the partition changes to get automatically picked up, but want
> > to do this by restarting the job manually so they can avoid unnoticed
> > changes in the jobs.
> >
> > Given that in the old Flink source, by default the auto partition
> discovery
> > was disabled, and there are use cases from both sides, we simply kept the
> > behavior unchanged. From the discussion we have here, it looks like
> > enabling auto partition discovery is much preferred. So I think we should
> > do it.
> >
> > I am not worried about the performance. The new Kafka source will only
> have
> > the SplitEnumerator sending metadata requests when the feature is
> enabled.
> > It is actually much cheaper than the old Kafka source where every
> > subtask does that.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote:
> >
> > > +1 for this proposal and thanks Qingsheng for driving this.
> > >
> > > Considering the interval, we also set the value as 5min, equivalent to
> > the
> > > default value of metadata.max.age.ms.
> > >
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Benchao Li <libenc...@apache.org>
> > > Sent: Friday, January 13, 2023 23:06
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by default
> in
> > > Kafka source
> > >
> > > +1, we've enabled this by default (10mins) in our production for years.
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
> > >
> > > > +1 for the proposal that makes users' daily work easier and therefore
> > > makes
> > > > Flink more attractive.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > >
> > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <re...@apache.org>
> > wrote:
> > > >
> > > > > Thanks everyone for joining the discussion!
> > > > >
> > > > > @Martijn:
> > > > >
> > > > > > All newly discovered partitions will be consumed from the
> earliest
> > > > offset
> > > > > possible.
> > > > >
> > > > > Thanks for the reminder! I checked the logic of KafkaSource and
> found
> > > > that
> > > > > new partitions will start from the offset initializer specified by
> > the
> > > > user
> > > > > instead of the earliest. We need to correct this behavior to avoid
> > > > dropping
> > > > > messages from new partitions.
> > > > >
> > > > > > Job restarts from checkpoint
> > > > >
> > > > > I think the current logic guarantees the exactly-once semantic. New
> > > > > partitions created after the checkpoint will be re-discovered again
> > and
> > > > > picked up by the source.
> > > > >
> > > > > @John:
> > > > >
> > > > > > If you want to be a little conservative with the default, 5
> minutes
> > > > might
> > > > > be better than 30 seconds.
> > > > >
> > > > > Thanks for the suggestion! I tried to find the equivalent config in
> > > Kafka
> > > > > but missed it. It would be neat to align with the default value of
> "
> > > > > metadata.max.age.ms".
> > > > >
> > > > > @Gabor:
> > > > >
> > > > > > removed partition handling is not yet added
> > > > >
> > > > > There was a detailed discussion about removing partitions [1] but
> it
> > > > looks
> > > > > like this is not an easy task considering the potential data loss
> and
> > > > state
> > > > > inconsistency. I'm afraid there's no clear plan on this one and
> maybe
> > > we
> > > > > could trigger a new discussion thread about how to correctly handle
> > > > removed
> > > > > partitions.
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> > > > >
> > > > > Best regards,
> > > > > Qingsheng
> > > > >
> > > > >
> > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
> > > gabor.g.somo...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 on the overall direction, it's an important feature.
> > > > > >
> > > > > > I've had a look on the latest master and looks like removed
> > partition
> > > > > > handling is not yet added but I think this is essential.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> > > > > >
> > > > > > If a partition all of a sudden disappears then it could lead to
> > data
> > > > > loss.
> > > > > > Are you planning to add it?
> > > > > > If yes then when?
> > > > > >
> > > > > > G
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Thanks for this proposal, Qingsheng!
> > > > > > >
> > > > > > > If you want to be a little conservative with the default, 5
> > minutes
> > > > > might
> > > > > > > be better than 30 seconds.
> > > > > > >
> > > > > > > The equivalent config in Kafka seems to be metadata.max.age.ms
> (
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > > > > > ),
> > > > > > > which has a default value of 5 minutes.
> > > > > > >
> > > > > > > Other than that, I’m in favor. I agree, this should be on by
> > > default.
> > > > > > >
> > > > > > > Thanks again,
> > > > > > > John
> > > > > > >
> > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > > > > > Thanks Qingsheng for driving this, enable the dynamic
> partition
> > > > > > > > discovery would be very useful for kafka topic scale
> partitions
> > > > > > > > scenarios.
> > > > > > > >
> > > > > > > > +1 for the change.
> > > > > > > >
> > > > > > > > CC: Becket
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Leonard
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com>
> > wrote:
> > > > > > > >>
> > > > > > > >> +1 for the change. I think this is beneficial for users and
> is
> > > > > > > compatible.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Jark
> > > > > > > >>
> > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com>
> > > wrote:
> > > > > > > >>
> > > > > > > >>>>
> > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic partition
> > > > > discovery
> > > > > > in
> > > > > > > >>> all
> > > > > > > >>>> jobs.
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>

Reply via email to