Hi Xuannan,

I think Hang raised a good question about the competition problem between
SplitEnumeratorContext#setIsProcessingBacklog and
backlog.watermark-lag-threshold.
Yes, you can define a rule for the priority, but there still be some people
may misunderstand it  (my gut feeling is it's an overwrite rule).

I think a better API doesn't compete with itself. Therefore, I'm in favor
of
supporting the watermark lag threshold for each source without introducing
any framework API and configuration. So KafkaSourceBuilder can have a
setProcessingBacklogWatermarkLagThreshold method which utilizes
SplitEnumeratorContext#setIsProcessingBacklog underlying. Actually,
this is the approach in the Rejected Alternatives of this FLIP.

Besides, this can address another concern that the watermark may be
generated by DataStream#assignTimestampsAndWatermarks which doesn't
work with the backlog.watermark-lag-threshold job config (only applied on
FLIP-27 SourceOperator). And this unworkable behavior may confuse
users a lot. If we support the watermark lag threshold for each source,
of course only the source with watermarking can have the watermark lag
threshold and this can address this problem.

One more question about the FLIP is that the FLIP says "Note that this
config
does not support switching source's isProcessingBacklog from false to true
for now."
Does that mean the job can never back to streaming mode once switches into
backlog mode? It sounds like not a complete FLIP to me. Is it possible to
support
switching back in this FLIP?

Best,
Jark


On Thu, 7 Sept 2023 at 13:51, Xuannan Su <suxuanna...@gmail.com> wrote:

> Hi all,
>
> Thank you for all the reviews and suggestions.
>
> I believe all the comments have been addressed. If there are no
> further comments, I plan to open the voting thread for this FLIP early
> next week.
>
> Best regards,
> Xuannan
>
> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
> >
> > Hi Xuannan,
> >
> > I thought FLIP-328 will compete with FLIP-309 while setting the value of
> > the backlog. Understood. Thanks for the hint.
> >
> > Best regards,
> > Jing
> >
> > On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thank you for the clarification.
> > >
> > > For the use case you mentioned, I believe we can utilize the
> > > HybridSource, as updated in FLIP-309[1], to determine the backlog
> > > status. For example, if the user wants to process data before time T
> > > in batch mode and after time T in stream mode, they can set the first
> > > source of the HybridSource to read up to time T and the last source of
> > > the HybridSource to read from time T.
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >
> > >
> > > On Mon, Sep 4, 2023 at 10:36 PM Jing Ge <j...@ververica.com.invalid>
> > > wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the clarification.
> > > >
> > > > 3. Event time and process time are two different things. It might be
> > > rarely
> > > > used, but conceptually, users can process data in the past within a
> > > > specific time range in the streaming mode. All data before that range
> > > will
> > > > be considered as backlog and needed to be processed in the batch
> mode,
> > > > like, e.g. the Present Perfect Progressive tense used in English
> > > language.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <suxuanna...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Jing,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 1. You are absolutely right that the watermark lag threshold must
> be
> > > > > carefully set with a thorough understanding of watermark
> generation.
> > > It is
> > > > > crucial for users to take into account the WatermarkStrategy when
> > > setting
> > > > > the watermark lag threshold.
> > > > >
> > > > > 2. Regarding pure processing-time based stream processing jobs,
> > > > > alternative strategies will be implemented to determine whether the
> > > job is
> > > > > processing backlog data. I have outlined two possible strategies
> below:
> > > > >
> > > > > - Based on the source operator's state. For example, when MySQL CDC
> > > source
> > > > > is reading snapshot, it can claim isBacklog=true.
> > > > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > > > isBacklog=true.
> > > > >
> > > > > As of the strategies proposed in this FLIP, it rely on generated
> > > > > watermarks. Therefore, if a user intends for the job to detect
> backlog
> > > > > status based on watermark, it is necessary to generate the
> watermark.
> > > > >
> > > > > 3. I'm afraid I'm not fully grasping your question. From my
> > > understanding,
> > > > > it should work in both cases. When event times are close to the
> > > processing
> > > > > time, resulting in watermarks close to the processing time, the
> job is
> > > not
> > > > > processing backlog data. On the other hand, when event times are
> far
> > > from
> > > > > processing time, causing watermarks to also be distant, if the lag
> > > > > surpasses the defined threshold, the job is considered processing
> > > backlog
> > > > > data.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > > >
> > > > > > On Aug 31, 2023, at 02:56, Jing Ge <j...@ververica.com.INVALID>
> > > wrote:
> > > > > >
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > Thanks for the clarification. That is the part where I am trying
> to
> > > > > > understand your thoughts. I have some follow-up questions:
> > > > > >
> > > > > > 1. It depends strongly on the watermarkStrategy and how
> customized
> > > > > > watermark generation looks like. It mixes business logic with
> > > technical
> > > > > > implementation and technical data processing mode. The value of
> the
> > > > > > watermark lag threshold must be set very carefully. If the value
> is
> > > too
> > > > > > small. any time, when the watermark generation logic is
> > > changed(business
> > > > > > logic changes lead to the threshold getting exceeded), the same
> job
> > > might
> > > > > > be running surprisingly in backlog processing mode, i.e. a
> butterfly
> > > > > > effect. A comprehensive documentation is required to avoid any
> > > confusion
> > > > > > for the users.
> > > > > > 2. Like Jark already mentioned, use cases that do not have
> > > watermarks,
> > > > > > like pure processing-time based stream processing[1] are not
> > > covered. It
> > > > > is
> > > > > > more or less a trade-off solution that does not support such use
> > > cases
> > > > > and
> > > > > > appropriate documentation is required. Forcing them to explicitly
> > > > > generate
> > > > > > watermarks that are never needed just because of this does not
> sound
> > > > > like a
> > > > > > proper solution.
> > > > > > 3. If I am not mistaken, it only works for use cases where event
> > > times
> > > > > are
> > > > > > very close to the processing times, because the wall clock is
> used to
> > > > > > calculate the watermark lag and the watermark is generated based
> on
> > > the
> > > > > > event time.
> > > > > >
> > > > > > Best regards,
> > > > > > Jing
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > > >
> > > > > > On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> suxuanna...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hi Jing,
> > > > > >>
> > > > > >> Thank you for the suggestion.
> > > > > >>
> > > > > >> The definition of watermark lag is the same as the watermarkLag
> > > metric
> > > > > in
> > > > > >> FLIP-33[1]. More specifically, the watermark lag calculation is
> > > > > computed at
> > > > > >> the time when a watermark is emitted downstream in the following
> > > way:
> > > > > >> watermarkLag = CurrentTime - Watermark. I have added this
> > > description to
> > > > > >> the FLIP.
> > > > > >>
> > > > > >> I hope this addresses your concern.
> > > > > >>
> > > > > >> Best,
> > > > > >> Xuannan
> > > > > >>
> > > > > >> [1]
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>
> > > > > >>
> > > > > >>> On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID
> >
> > > wrote:
> > > > > >>>
> > > > > >>> Hi Xuannan,
> > > > > >>>
> > > > > >>> Thanks for the proposal. +1 for me.
> > > > > >>>
> > > > > >>> There is one tiny thing that I am not sure if I understand it
> > > > > correctly.
> > > > > >>> Since there will be many different WatermarkStrategies and
> > > different
> > > > > >>> WatermarkGenerators. Could you please update the FLIP and add
> the
> > > > > >>> description of how the watermark lag is calculated exactly?
> E.g.
> > > > > >> Watermark
> > > > > >>> lag = A - B with A is the timestamp of the watermark emitted
> to the
> > > > > >>> downstream and B is....(this is the part I am not really sure
> after
> > > > > >> reading
> > > > > >>> the FLIP).
> > > > > >>>
> > > > > >>> Best regards,
> > > > > >>> Jing
> > > > > >>>
> > > > > >>>
> > > > > >>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> suxuanna...@gmail.com>
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Hi Jark,
> > > > > >>>>
> > > > > >>>> Thanks for the comments.
> > > > > >>>>
> > > > > >>>> I agree that the current solution cannot support jobs that
> cannot
> > > > > define
> > > > > >>>> watermarks. However, after considering the
> pending-record-based
> > > > > >> solution, I
> > > > > >>>> believe the current solution is superior for the target use
> case
> > > as it
> > > > > >> is
> > > > > >>>> more intuitive for users. The backlog status gives users the
> > > ability
> > > > > to
> > > > > >>>> balance between throughput and latency. Making this trade-off
> > > decision
> > > > > >>>> based on the watermark lag is more intuitive from the user's
> > > > > >> perspective.
> > > > > >>>> For instance, a user can decide that if the job lags behind
> the
> > > > > current
> > > > > >>>> time by more than 1 hour, the result is not usable. In that
> case,
> > > we
> > > > > can
> > > > > >>>> optimize for throughput when the data lags behind by more
> than an
> > > > > hour.
> > > > > >>>> With the pending-record-based solution, it's challenging for
> > > users to
> > > > > >>>> determine when to optimize for throughput and when to
> prioritize
> > > > > >> latency.
> > > > > >>>>
> > > > > >>>> Regarding the limitations of the watermark-based solution:
> > > > > >>>>
> > > > > >>>> 1. The current solution can support jobs with sources that
> have
> > > event
> > > > > >>>> time. Users can always define a watermark at the source
> operator,
> > > even
> > > > > >> if
> > > > > >>>> it's not used by downstream operators, such as streaming join
> and
> > > > > >> unbounded
> > > > > >>>> aggregate.
> > > > > >>>>
> > > > > >>>> 2.I don't believe it's accurate to say that the watermark lag
> will
> > > > > keep
> > > > > >>>> increasing if no data is generated in Kafka. The watermark
> lag and
> > > > > >> backlog
> > > > > >>>> status are determined at the moment when the watermark is
> emitted
> > > to
> > > > > the
> > > > > >>>> downstream operator. If no data is emitted from the source,
> the
> > > > > >> watermark
> > > > > >>>> lag and backlog status will not be updated. If the
> > > WatermarkStrategy
> > > > > >> with
> > > > > >>>> idleness is used, the source becomes non-backlog when it
> becomes
> > > idle.
> > > > > >>>>
> > > > > >>>> 3. I think watermark lag is more intuitive to determine if a
> job
> > > is
> > > > > >>>> processing backlog data. Even when using pending records, it
> > > faces a
> > > > > >>>> similar issue. For example, if the source has 1K pending
> records,
> > > > > those
> > > > > >>>> records can span from 1 day  to 1 hour to 1 second. If the
> records
> > > > > span
> > > > > >> 1
> > > > > >>>> day, it's probably best to optimize for throughput. If they
> span 1
> > > > > >> hour, it
> > > > > >>>> depends on the business logic. If they span 1 second,
> optimizing
> > > for
> > > > > >>>> latency is likely the better choice.
> > > > > >>>>
> > > > > >>>> In summary, I believe the watermark-based solution is a
> superior
> > > > > choice
> > > > > >>>> for the target use case where watermark/event time can be
> defined.
> > > > > >>>> Additionally, I haven't come across a scenario that requires
> > > > > low-latency
> > > > > >>>> processing and reads from a source that cannot define
> watermarks.
> > > If
> > > > > we
> > > > > >>>> encounter such a use case, we can create another FLIP to
> address
> > > those
> > > > > >>>> needs in the future. What do you think?
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Xuannan
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com
> <mailto:
> > > > > >>>> imj...@gmail.com>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Xuannan,
> > > > > >>>>>
> > > > > >>>>> Thanks for opening this discussion.
> > > > > >>>>>
> > > > > >>>>> This current proposal may work in the mentioned watermark
> cases.
> > > > > >>>>> However, it seems this is not a general solution for sources
> to
> > > > > >> determine
> > > > > >>>>> "isProcessingBacklog".
> > > > > >>>>> From my point of view, there are 3 limitations of the current
> > > > > proposal:
> > > > > >>>>> 1. It doesn't cover jobs that don't have watermark/event-time
> > > > > defined,
> > > > > >>>>> for example streaming join and unbounded aggregate. We may
> still
> > > need
> > > > > >> to
> > > > > >>>>> figure out solutions for them.
> > > > > >>>>> 2. Watermark lag can not be trusted, because it increases
> > > unlimited
> > > > > if
> > > > > >> no
> > > > > >>>>> data is generated in the Kafka.
> > > > > >>>>> But in this case, there is no backlog at all.
> > > > > >>>>> 3. Watermark lag is hard to reflect the amount of backlog.
> If the
> > > > > >>>> watermark
> > > > > >>>>> lag is 1day or 1 hour or 1second,
> > > > > >>>>> there is possibly only 1 pending record there, which means no
> > > backlog
> > > > > >> at
> > > > > >>>>> all.
> > > > > >>>>>
> > > > > >>>>> Therefore, IMO, watermark maybe not the ideal metric used to
> > > > > determine
> > > > > >>>>> "isProcessingBacklog".
> > > > > >>>>> What we need is something that reflects the number of records
> > > > > >> unprocessed
> > > > > >>>>> by the job.
> > > > > >>>>> Actually, that is the "pendingRecords" metric proposed in
> > > FLIP-33 and
> > > > > >> has
> > > > > >>>>> been implemented by Kafka source.
> > > > > >>>>> Did you consider using "pendingRecords" metric to determine
> > > > > >>>>> "isProcessingBacklog"?
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Jark
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> [1]
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>> <
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > tonysong...@gmail.com
> > > > > >>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Sounds good to me.
> > > > > >>>>>>
> > > > > >>>>>> It is true that, if we are introducing the generalized
> > > watermark,
> > > > > >> there
> > > > > >>>>>> will be other watermark related concepts / configurations
> that
> > > need
> > > > > to
> > > > > >>>> be
> > > > > >>>>>> updated anyway.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Best,
> > > > > >>>>>>
> > > > > >>>>>> Xintong
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > suxuanna...@gmail.com
> > > > > >>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hi Xingtong,
> > > > > >>>>>>>
> > > > > >>>>>>> Thank you for your suggestion.
> > > > > >>>>>>>
> > > > > >>>>>>> After considering the idea of using a general configuration
> > > key, I
> > > > > >>>> think
> > > > > >>>>>>> it may not be a good idea for the reasons below.
> > > > > >>>>>>>
> > > > > >>>>>>> While I agree that using a more general configuration key
> > > provides
> > > > > us
> > > > > >>>>>> with
> > > > > >>>>>>> the flexibility to switch to other approaches to calculate
> the
> > > lag
> > > > > in
> > > > > >>>> the
> > > > > >>>>>>> future, the downside is that it may cause confusion for
> users.
> > > We
> > > > > >>>>>> currently
> > > > > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag
> in
> > > the
> > > > > >>>> source,
> > > > > >>>>>>> and it is not clear which specific lag we are referring to.
> > > With
> > > > > the
> > > > > >>>>>>> potential introduction of the Generalized Watermark
> mechanism
> > > in
> > > > > the
> > > > > >>>>>>> future, if I understand correctly, a watermark won't
> > > necessarily
> > > > > need
> > > > > >>>> to
> > > > > >>>>>> be
> > > > > >>>>>>> a timestamp. I am concern that the general configuration
> key
> > > may
> > > > > not
> > > > > >>>> be
> > > > > >>>>>>> enough to cover all the use case and we will need to
> introduce
> > > a
> > > > > >>>> general
> > > > > >>>>>>> way to determine the backlog status regardless.
> > > > > >>>>>>>
> > > > > >>>>>>> For the reasons above, I prefer introducing the
> configuration
> > > as
> > > > > is,
> > > > > >>>> and
> > > > > >>>>>>> change it later with the a deprecation process or migration
> > > > > process.
> > > > > >>>> What
> > > > > >>>>>>> do you think?
> > > > > >>>>>>>
> > > > > >>>>>>> Best,
> > > > > >>>>>>> Xuannan
> > > > > >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > tonysong...@gmail.com
> > > > > >>>> <mailto:tonysong...@gmail.com>>,
> > > > > >>>>>> wrote:
> > > > > >>>>>>>> Thanks for the explanation.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I wonder if it makes sense to not expose this detail via
> the
> > > > > >>>>>>> configuration
> > > > > >>>>>>>> option. To be specific, I suggest not mentioning the
> > > "watermark"
> > > > > >>>>>> keyword
> > > > > >>>>>>> in
> > > > > >>>>>>>> the configuration key and description.
> > > > > >>>>>>>>
> > > > > >>>>>>>> - From the users' perspective, I think they only need to
> know
> > > > > >> there's
> > > > > >>>> a
> > > > > >>>>>>>> lag higher than the given threshold, Flink will consider
> > > latency
> > > > > of
> > > > > >>>>>>>> individual records as less important and prioritize
> throughput
> > > > > over
> > > > > >>>> it.
> > > > > >>>>>>>> They don't really need the details of how the lags are
> > > calculated.
> > > > > >>>>>>>> - For the internal implementation, I also think using
> > > watermark
> > > > > lags
> > > > > >>>> is
> > > > > >>>>>>>> a good idea, for the reasons you've already mentioned.
> > > However,
> > > > > it's
> > > > > >>>>>> not
> > > > > >>>>>>>> the only possible option. Hiding this detail from users
> would
> > > give
> > > > > >> us
> > > > > >>>>>> the
> > > > > >>>>>>>> flexibility to switch to other approaches if needed in
> future.
> > > > > >>>>>>>> - We are currently working on designing the
> ProcessFunction
> > > API
> > > > > >>>>>>>> (consider it as a DataStream API V2). There's an idea to
> > > > > introduce a
> > > > > >>>>>>>> Generalized Watermark mechanism, where basically the
> > > watermark can
> > > > > >> be
> > > > > >>>>>>>> anything that needs to travel along the data-flow with
> certain
> > > > > >>>>>> alignment
> > > > > >>>>>>>> strategies, and event time watermark would be one specific
> > > case of
> > > > > >> it.
> > > > > >>>>>>> This
> > > > > >>>>>>>> is still an idea and has not been discussed and agreed on
> by
> > > the
> > > > > >>>>>>> community,
> > > > > >>>>>>>> and we are preparing a FLIP for it. But if we are going
> for
> > > it,
> > > > > the
> > > > > >>>>>>> concept
> > > > > >>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I do not intend to block the FLIP on this. I'd also be
> fine
> > > with
> > > > > >>>>>>>> introducing the configuration as is, and changing it
> later, if
> > > > > >> needed,
> > > > > >>>>>>> with
> > > > > >>>>>>>> a regular deprecation and migration process. Just making
> my
> > > > > >>>>>> suggestions.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> Best,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Xintong
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > > suxuanna...@gmail.com
> > > > > >>>> <mailto:suxuanna...@gmail.com>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Hi Xintong,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thanks for the reply.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I have considered using the timestamp in the records to
> > > determine
> > > > > >> the
> > > > > >>>>>>>>> backlog status, and decided to use watermark at the end.
> By
> > > > > >>>>>> definition,
> > > > > >>>>>>>>> watermark is the time progress indication in the data
> > > stream. It
> > > > > >>>>>>> indicates
> > > > > >>>>>>>>> the stream’s event time has progressed to some specific
> > > time. On
> > > > > >> the
> > > > > >>>>>>> other
> > > > > >>>>>>>>> hand, timestamp in the records is usually used to
> generate
> > > the
> > > > > >>>>>>> watermark.
> > > > > >>>>>>>>> Therefore, it appears more appropriate and intuitive to
> > > calculate
> > > > > >> the
> > > > > >>>>>>> event
> > > > > >>>>>>>>> time lag by watermark and determine the backlog status.
> And
> > > by
> > > > > >> using
> > > > > >>>>>>> the
> > > > > >>>>>>>>> watermark, we can easily deal with the out-of-order and
> the
> > > > > >> idleness
> > > > > >>>>>>> of the
> > > > > >>>>>>>>> data.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Please let me know if you have further questions.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Best,
> > > > > >>>>>>>>> Xuannan
> > > > > >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > > tonysong...@gmail.com
> > > > > >>>> <mailto:tonysong...@gmail.com>>,
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> +1 in general.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> A quick question, could you explain why we are relying
> on
> > > the
> > > > > >>>>>>> watermark
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>>> emitting the record attribute? Why not use timestamps
> in the
> > > > > >>>>>>> records? I
> > > > > >>>>>>>>>> don't see any concern in using watermarks. Just
> wondering if
> > > > > >>>>>> there's
> > > > > >>>>>>> any
> > > > > >>>>>>>>>> deep considerations behind this.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Best,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Xintong
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > > suxuanna...@gmail.com
> > > > > >>>> <mailto:suxuanna...@gmail.com>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow
> source
> > > > > >>>>>>> operators to
> > > > > >>>>>>>>>>> determine isProcessingBacklog based on watermark
> lag[1]. We
> > > > > had a
> > > > > >>>>>>>>> several
> > > > > >>>>>>>>>>> discussions with Dong Ling about the design, and thanks
> > > for all
> > > > > >>>>>> the
> > > > > >>>>>>>>>>> valuable advice.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> The FLIP aims to target the use-case where user want to
> > > run a
> > > > > >>>>>> Flink
> > > > > >>>>>>>>> job to
> > > > > >>>>>>>>>>> backfill historical data in a high throughput manner
> and
> > > > > continue
> > > > > >>>>>>>>>>> processing real-time data with low latency. Building
> upon
> > > the
> > > > > >>>>>>> backlog
> > > > > >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
> enables
> > > > > sources
> > > > > >>>>>> to
> > > > > >>>>>>>>> report
> > > > > >>>>>>>>>>> their status of processing backlog based on the
> watermark
> > > lag.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> We would greatly appreciate any comments or feedback
> you
> > > may
> > > > > have
> > > > > >>>>>>> on
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>> proposal.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best,
> > > > > >>>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> [1]
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > >>>> <
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > >>>>>
> > > > > >>>>>>>>>>> [2]
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >>>> <
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > >
>

Reply via email to