Joined the discussion a bit late, but just want to add in my +1 as well :-)

Historically, when dynamic partition discovery was implemented in the
earlier versions of the FlinkKafkaConsumer, it was implemented such that
multiple source subtasks would in parallel query Kafka brokers for
topic/partition metadata at the configured discovery interval. There were
concerns about this at a larger scale, hence the feature was disabled by
default.

I don't see any reason why to not enable this by default for the latest
implementations of the KafkaSourceEnumerator.

That being said - this is essentially a breaking user-facing change in that
it has functional side effects, but I don't see any way of introducing this
without a breaking change either.

I imagine that the group of users that are most likely to be caught by
surprise are users who use regex topic pattern subscription, but did not
enable partition discovery. We need to be diligent in documenting
(including releasing blog posts) about this change.

removed partition handling is not yet added


I agree with @Qinsheng that this can be an orthogonal topic outside the
scope of the planned changes here as it isn't straightforward.

On Fri, Feb 10, 2023 at 12:56 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Oh and Mason, definitely interesting! :)
>
> On Fri, Feb 10, 2023 at 9:51 PM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > @Qingsheng what are your next steps for this proposal?
> >
> > On Thu, Jan 19, 2023 at 9:14 AM Mason Chen <mas.chen6...@gmail.com>
> wrote:
> >
> >> Hi all,
> >>
> >> Sorry to come into the discussion late--I saw the thread earlier.
> >>
> >> I'm also +1 for the change in general. I think most users have this
> turned
> >> on by default since the overhead is quite low. A default in the two
> digit
> >> seconds range works well for us. However, I do have two main concerns
> that
> >> are related, but don't necessarily block this FLIP:
> >>
> >> 1. Timestamp Offset Initializer
> >>
> >> Currently, the timestamp offset initializer defaults the offset reset
> >> strategy to LATEST. This can present some problems if the discovery
> >> interval is set too large since records from new partitions could be
> >> skipped (the set timestamp is not found in Kafka, thus resetting to the
> >> latest). Here is a ticket to allow customizations:
> >> https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you might
> >> remember this from a PR review). Thanks for mentioning this in your
> FLIP!
> >>
> >> 2. AdminClient Fault Tolerance
> >>
> >> AdminClient, which is used for partition discovery, seems not to handle
> >> Kafka timeouts as robustly as the KafkaConsumer API, and we have noticed
> >> that transient network hiccups cause full job restarts (since the
> >> jobmanager fails) in numerous incidents. Internally, we have introduced
> an
> >> error handling strategy based on the number of consecutive partition
> >> discovery failures. I'm interested in opening a JIRA ticket to
> contribute
> >> this feature back to Flink and open making the error handling more
> >> pluggable. What do you think?
> >>
> >> Best,
> >> Mason
> >>
> >> On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org>
> wrote:
> >>
> >> > Thanks for the input Becket!
> >> >
> >> > I reorganized this proposal into FLIP-288 [1].
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> >> >
> >> > Best,
> >> > Qingsheng
> >> >
> >> > On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com>
> >> wrote:
> >> >
> >> > > Thanks for the proposal, Qingsheng.
> >> > >
> >> > > +1 to enable auto partition discovery by default. Just a reminder,
> we
> >> > need
> >> > > a FLIP for this.
> >> > >
> >> > > A bit more background on this.
> >> > >
> >> > > Most of the Kafka users simply subscribe to a topic and let the
> >> consumer
> >> > to
> >> > > automatically adapt to partition changes. So enabling auto partition
> >> > > discovery would align with that experience. The counter argument
> last
> >> > time
> >> > > when I proposed to enable auto partition discovery was mainly due to
> >> the
> >> > > concern from the Flink users. There were arguments that sometimes
> >> users
> >> > > don't want the partition changes to get automatically picked up, but
> >> want
> >> > > to do this by restarting the job manually so they can avoid
> unnoticed
> >> > > changes in the jobs.
> >> > >
> >> > > Given that in the old Flink source, by default the auto partition
> >> > discovery
> >> > > was disabled, and there are use cases from both sides, we simply
> kept
> >> the
> >> > > behavior unchanged. From the discussion we have here, it looks like
> >> > > enabling auto partition discovery is much preferred. So I think we
> >> should
> >> > > do it.
> >> > >
> >> > > I am not worried about the performance. The new Kafka source will
> only
> >> > have
> >> > > the SplitEnumerator sending metadata requests when the feature is
> >> > enabled.
> >> > > It is actually much cheaper than the old Kafka source where every
> >> > > subtask does that.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > >
> >> > >
> >> > > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote:
> >> > >
> >> > > > +1 for this proposal and thanks Qingsheng for driving this.
> >> > > >
> >> > > > Considering the interval, we also set the value as 5min,
> equivalent
> >> to
> >> > > the
> >> > > > default value of metadata.max.age.ms.
> >> > > >
> >> > > >
> >> > > > Best
> >> > > > Yun Tang
> >> > > > ________________________________
> >> > > > From: Benchao Li <libenc...@apache.org>
> >> > > > Sent: Friday, January 13, 2023 23:06
> >> > > > To: dev@flink.apache.org <dev@flink.apache.org>
> >> > > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by
> >> default
> >> > in
> >> > > > Kafka source
> >> > > >
> >> > > > +1, we've enabled this by default (10mins) in our production for
> >> years.
> >> > > >
> >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
> >> > > >
> >> > > > > +1 for the proposal that makes users' daily work easier and
> >> therefore
> >> > > > makes
> >> > > > > Flink more attractive.
> >> > > > >
> >> > > > > Best regards,
> >> > > > > Jing
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <
> re...@apache.org>
> >> > > wrote:
> >> > > > >
> >> > > > > > Thanks everyone for joining the discussion!
> >> > > > > >
> >> > > > > > @Martijn:
> >> > > > > >
> >> > > > > > > All newly discovered partitions will be consumed from the
> >> > earliest
> >> > > > > offset
> >> > > > > > possible.
> >> > > > > >
> >> > > > > > Thanks for the reminder! I checked the logic of KafkaSource
> and
> >> > found
> >> > > > > that
> >> > > > > > new partitions will start from the offset initializer
> specified
> >> by
> >> > > the
> >> > > > > user
> >> > > > > > instead of the earliest. We need to correct this behavior to
> >> avoid
> >> > > > > dropping
> >> > > > > > messages from new partitions.
> >> > > > > >
> >> > > > > > > Job restarts from checkpoint
> >> > > > > >
> >> > > > > > I think the current logic guarantees the exactly-once
> semantic.
> >> New
> >> > > > > > partitions created after the checkpoint will be re-discovered
> >> again
> >> > > and
> >> > > > > > picked up by the source.
> >> > > > > >
> >> > > > > > @John:
> >> > > > > >
> >> > > > > > > If you want to be a little conservative with the default, 5
> >> > minutes
> >> > > > > might
> >> > > > > > be better than 30 seconds.
> >> > > > > >
> >> > > > > > Thanks for the suggestion! I tried to find the equivalent
> >> config in
> >> > > > Kafka
> >> > > > > > but missed it. It would be neat to align with the default
> value
> >> of
> >> > "
> >> > > > > > metadata.max.age.ms".
> >> > > > > >
> >> > > > > > @Gabor:
> >> > > > > >
> >> > > > > > > removed partition handling is not yet added
> >> > > > > >
> >> > > > > > There was a detailed discussion about removing partitions [1]
> >> but
> >> > it
> >> > > > > looks
> >> > > > > > like this is not an easy task considering the potential data
> >> loss
> >> > and
> >> > > > > state
> >> > > > > > inconsistency. I'm afraid there's no clear plan on this one
> and
> >> > maybe
> >> > > > we
> >> > > > > > could trigger a new discussion thread about how to correctly
> >> handle
> >> > > > > removed
> >> > > > > > partitions.
> >> > > > > >
> >> > > > > > [1]
> >> > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> >> > > > > >
> >> > > > > > Best regards,
> >> > > > > > Qingsheng
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
> >> > > > gabor.g.somo...@gmail.com
> >> > > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > +1 on the overall direction, it's an important feature.
> >> > > > > > >
> >> > > > > > > I've had a look on the latest master and looks like removed
> >> > > partition
> >> > > > > > > handling is not yet added but I think this is essential.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> >> > > > > > >
> >> > > > > > > If a partition all of a sudden disappears then it could lead
> >> to
> >> > > data
> >> > > > > > loss.
> >> > > > > > > Are you planning to add it?
> >> > > > > > > If yes then when?
> >> > > > > > >
> >> > > > > > > G
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <
> >> > vvcep...@apache.org>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Thanks for this proposal, Qingsheng!
> >> > > > > > > >
> >> > > > > > > > If you want to be a little conservative with the default,
> 5
> >> > > minutes
> >> > > > > > might
> >> > > > > > > > be better than 30 seconds.
> >> > > > > > > >
> >> > > > > > > > The equivalent config in Kafka seems to be
> >> metadata.max.age.ms
> >> > (
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> >> > > > > > > ),
> >> > > > > > > > which has a default value of 5 minutes.
> >> > > > > > > >
> >> > > > > > > > Other than that, I’m in favor. I agree, this should be on
> by
> >> > > > default.
> >> > > > > > > >
> >> > > > > > > > Thanks again,
> >> > > > > > > > John
> >> > > > > > > >
> >> > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> >> > > > > > > > > Thanks Qingsheng for driving this, enable the dynamic
> >> > partition
> >> > > > > > > > > discovery would be very useful for kafka topic scale
> >> > partitions
> >> > > > > > > > > scenarios.
> >> > > > > > > > >
> >> > > > > > > > > +1 for the change.
> >> > > > > > > > >
> >> > > > > > > > > CC: Becket
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > Best,
> >> > > > > > > > > Leonard
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com
> >
> >> > > wrote:
> >> > > > > > > > >>
> >> > > > > > > > >> +1 for the change. I think this is beneficial for users
> >> and
> >> > is
> >> > > > > > > > compatible.
> >> > > > > > > > >>
> >> > > > > > > > >> Best,
> >> > > > > > > > >> Jark
> >> > > > > > > > >>
> >> > > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <
> xuehaijux...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > > > > >>
> >> > > > > > > > >>>>
> >> > > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic
> >> partition
> >> > > > > > discovery
> >> > > > > > > in
> >> > > > > > > > >>> all
> >> > > > > > > > >>>> jobs.
> >> > > > > > > > >>>>
> >> > > > > > > > >>>>
> >> > > > > > > > >>>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > >
> >> > > > Best,
> >> > > > Benchao Li
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to