Hi Dong,

Please see my comments inline below.

> Hmm.. can you explain what you mean by "different watermark delay
> definitions for each source"?

For example, "table1" defines a watermark with delay 5 seconds,
"table2" defines a watermark with delay 10 seconds. They have different
watermark delay definitions. So it is also reasonable they have different
watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
allows "20mins".

> I think there is probably misunderstanding here. FLIP-309 does NOT
directly
> specify when backlog is false. It is intentionally specified in such a way
> that there will  not be any conflict between these rules.

Do you mean FLIP-309 doesn't allow to specify backlog to be false?
Is this mentioned in FLIP-309? This is completely different from what I
understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
also says "MySQL CDC source should report isProcessingBacklog=false
at the beginning of the changelog stage." If not, maybe we need to revisit
FLIP-309.

Best,
Jark



On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com> wrote:

> Hi Jark,
>
> Do you have any follow-up comment?
>
> My gut feeling is that suppose we need to support per-source watermark lag
> specification in the future (not sure we have a use-case for this right
> now), we can add such a config in the future with a follow-up FLIP. The
> job-level config will still be useful as it makes users' configuration
> simpler for common scenarios.
>
> If it is OK, can we agree to make incremental progress for Flink and start
> a voting thread for this FLIP?
>
> Thanks,
> Dong
>
>
> On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline.
> >
> > >  As a result, the proposed job-level
> > > config will be applied only in the changelog stage. So there is no
> > > difference between these two approaches in this particular case, right?
> >
> > How the job-level config can be applied ONLY in the changelog stage?
> > I think it is only possible if it is implemented by the CDC source
> itself,
> > because the framework doesn't know which stage of the source is.
> > Know that the CDC source may emit watermarks with a very small lag
> > in the snapshot stage, and the job-level config may turn the backlog
> > status into false.
> >
> > > On the other hand, per-source config will be necessary if users want to
> > > apply different watermark lag thresholds for different sources in the
> > same
> > > job.
> >
> > We also have different watermark delay definitions for each source,
> > I think this's also reasonable and necessary to have different watermark
> > lags.
> >
> >
> > > Each source can have its own rule that specifies when the backlog can
> be
> > true
> > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > stage).
> > > And we can have a job-level config that specifies when the backlog
> should
> > > be true. Note that it is designed in such a way that none of these
> rules
> > > specify when the backlog should be false. That is why there is no
> > conflict
> > > by definition.
> >
> > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> > backlog
> > is true and when is FALSE. This conflicts with the job-level config as it
> > will turn
> > the status into true.
> >
> > > If I understand your comments correctly, you mean that we might have a
> > > Flink SQL DDL with user-defined watermark expressions. And users also
> > want
> > > to set the backlog to true if the watermark generated by that
> > > user-specified expression exceeds a threshold.
> >
> > No. I mean the source may not support generating watermarks, so the
> > watermark
> > expression is applied in a following operator (instead of in the source
> > operator).
> > This will result in the watermark lag doesn't work in this case and
> confuse
> > users.
> >
> > > You are right that this is a limitation. However, this is only a
> > short-term
> > > limitation which we added to make sure that we can focus on the
> > capability
> > > to switch from backlog=true to backlog=false. In the future, we will
> > remove
> > > this limitation and also support switching from backlog=false to
> > > backlog=true.
> >
> > I can understand it may be difficult to support runtime mode switching
> back
> > and forth.
> > However, I think this should be a limitation of FLIP-327, not FLIP-328.
> > IIUC,
> > FLIP-309 doesn't have this limitation, right? I just don't understand
> > what's the
> > challenge to switch a flag?
> >
> > Best,
> > Jark
> >
> >
> > On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Jark,
> > >
> > > Thanks for the comments. Please see my comments inline.
> > >
> > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:
> > >
> > > > Hi Xuannan,
> > > >
> > > > I leave my comments inline.
> > > >
> > > > > In the case where a user wants to
> > > > > use a CDC source and also determine backlog status based on
> watermark
> > > > > lag, we still need to define the rule when that occurs
> > > >
> > > > This rule should be defined by the source itself (who knows backlog
> > > best),
> > > > not by the framework. In the case of CDC source, it reports
> > > isBacklog=true
> > > > during snapshot stage, and report isBacklog=false during changelog
> > stage
> > > if
> > > > watermark-lag is within the threshold.
> > > >
> > >
> > > I am not sure I fully understand the difference between adding a
> > job-level
> > > config vs. adding a per-source config.
> > >
> > > In the case of CDC, its watermark lag should be either unde-defined or
> > > really large in the snapshot stage. As a result, the proposed job-level
> > > config will be applied only in the changelog stage. So there is no
> > > difference between these two approaches in this particular case, right?
> > >
> > > There are two advantages of the job-level config over per-source
> config:
> > >
> > > 1) Configuration is simpler. For example, suppose a user has a Flink
> job
> > > that consumes records from multiple Kafka sources and wants to
> determine
> > > backlog status for these Kafka sources using the same watermark lag
> > > threshold, there is no need for users to repeatedly specify this
> > threshold
> > > for each source.
> > >
> > > 2) There is a smaller number of public APIs overall. In particular,
> > instead
> > > of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API
> > for
> > > every source operator that has even-time watermark lag defined, we only
> > > need to add one job-level config. Less public API means better
> simplicity
> > > and maintainability in general.
> > >
> > > On the other hand, per-source config will be necessary if users want to
> > > apply different watermark lag thresholds for different sources in the
> > same
> > > job. Personally, I find this a bit counter-intuitive for users to
> specify
> > > different watermark lag thresholds in the same job.
> > >
> > > Do you think there is any real-word use-case that requires this? Could
> > you
> > > provide a specific use-case where per-source config can provide an
> > > advantage over the job-level config?
> > >
> > >
> > > > I think it's not intuitive to combine it with the logical OR
> operation.
> > > > Even for the
> > > > combination logic of backlog status from different channels, FLIP-309
> > > said
> > > > it is
> > > > "up to the operator to determine its output records' isBacklog value"
> > and
> > > > proposed
> > > > 3 different strategies. Therefore, I think backlog status from a
> single
> > > > source should
> > > > be up to the source.
> > >
> > >
> > > For both the job-level config and the per-source config, it is
> eventually
> > > up to the user to decide the computation logic of the backlog status.
> > > Whether this mechanism is implemented at the per-source level or
> > framework
> > > level is probably more like an implementation detail.
> > >
> > > Eventually, I think the choice between these two approaches depends on
> > > whether we have any use-case for users to specify different watermark
> lag
> > > thresholds in the same job.
> > >
> > >
> > > >
> > > > IMO, a better API design is not how to resolve conflicts but not
> > > > introducing conflicts.
> > >
> > >
> > > Just to clarify, the current FLIP does not introduce any conflict. Each
> > > source can have its own rule that specifies when the backlog can be
> true
> > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > stage).
> > > And we can have a job-level config that specifies when the backlog
> should
> > > be true. Note that it is designed in such a way that none of these
> rules
> > > specify when the backlog should be false. That is why there is no
> > conflict
> > > by definition.
> > >
> > >
> > >
> > > Let the source determine backlog status removes the conflicts and I
> don't
> > > > see big
> > > > disadvantages.
> > > >
> > > > > It should not confuse the user that
> > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > >
> > > > Hmm, so this configuration may confuse Flink SQL users, because all
> > > > watermarks
> > > > are defined on the source DDL, but it may use a separate operator to
> > emit
> > > > watermarks
> > > > if the source doesn't support emitting watermarks.
> > > >
> > >
> > > If I understand your comments correctly, you mean that we might have a
> > > Flink SQL DDL with user-defined watermark expressions. And users also
> > want
> > > to set the backlog to true if the watermark generated by that
> > > user-specified expression exceeds a threshold.
> > >
> > > That is a good point and use-case. I agree we should also cover this
> > > scenario. And we can update FLIP-328 to mention that the job-level
> config
> > > will also be applicable when the watermark derived from the Flink SQL
> DDL
> > > exceeds this threshold. Would this address your concern?
> > >
> > >
> > > >
> > > > > I think the description in the FLIP actually means the other way
> > > > > around, where the job can never switch back to batch mode once it
> has
> > > > > switched into streaming mode. This is to align with the current
> state
> > > > > of FLIP-327[1], where only switching from batch to stream mode is
> > > > > supported.
> > > >
> > > > This sounds like a limitation of FLIP-327 (that execution mode
> depends
> > on
> > > > backlog status).
> > > > But the backlog status shouldn't have this limitation, because it is
> > not
> > > > only used for execution
> > > > switching.
> > > >
> > >
> > > You are right that this is a limitation. However, this is only a
> > short-term
> > > limitation which we added to make sure that we can focus on the
> > capability
> > > to switch from backlog=true to backlog=false. In the future, we will
> > remove
> > > this limitation and also support switching from backlog=false to
> > > backlog=true.
> > >
> > > The capability to switch from backlog=true to backlog=false will
> > mitigate a
> > > lot of problems we are facing now. As it is common for users to start a
> > > Flink job to process backlog data followed by real-time data. On the
> > other
> > > hand, switching from backlog=false to backlog=true is useful when there
> > is
> > > a traffic spike while the Flink job is processing real-time data, which
> > is
> > > also useful to address but less important than the previous one.
> > >
> > > Given that both features require considerable changes to the underlying
> > > runtime, we think it might be useful and safe to tackle them one by
> one.
> > >
> > > Thanks again for the comments. Please let us know what you think.
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > >
> > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <suxuanna...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jark and Leonard,
> > > > >
> > > > > Thanks for the comments. Please see my reply below.
> > > > >
> > > > > @Jark
> > > > >
> > > > > > I think a better API doesn't compete with itself. Therefore, I'm
> in
> > > > > favor of
> > > > > > supporting the watermark lag threshold for each source without
> > > > > introducing
> > > > > > any framework API and configuration.
> > > > >
> > > > > I don't think supporting the watermark lag threshold for each
> source
> > > > > can avoid the competition problem. In the case where a user wants
> to
> > > > > use a CDC source and also determine backlog status based on
> watermark
> > > > > lag, we still need to define the rule when that occurs. With that
> > > > > said, I think it is more intuitive to combine it with the logical
> OR
> > > > > operation, as the strategies (FLIP-309, FLIP-328) only determine
> when
> > > > > the source's backlog status should be True. What do you think?
> > > > >
> > > > > > Besides, this can address another concern that the watermark may
> be
> > > > > > generated by DataStream#assignTimestampsAnd
> > > > > > Watermarks which doesn't
> > > > > > work with the backlog.watermark-lag-threshold job config
> > > > >
> > > > > The description of the configuration explicitly states that "a
> source
> > > > > would report isProcessingBacklog=true if its watermark lag exceeds
> > the
> > > > > configured value". It should not confuse the user that
> > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > > >
> > > > > > Does that mean the job can never back to streaming mode once
> > switches
> > > > > into
> > > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > > possible
> > > > to
> > > > > > support switching back in this FLIP?
> > > > >
> > > > > I think the description in the FLIP actually means the other way
> > > > > around, where the job can never switch back to batch mode once it
> has
> > > > > switched into streaming mode. This is to align with the current
> state
> > > > > of FLIP-327[1], where only switching from batch to stream mode is
> > > > > supported.
> > > > >
> > > > > @Leonard
> > > > >
> > > > > > > The FLIP describe that: And it should report
> > > > isProcessingBacklog=false
> > > > > at the beginning of the snapshot stage.
> > > > > > This should be “changelog stage”
> > > > >
> > > > > I think the description is in FLIP-309. Thanks for pointing out. I
> > > > > updated the description.
> > > > >
> > > > > > I'm not sure if it's enough to support this feature only in
> FLIP-27
> > > > > Source. Although we are pushing the sourceFunction API to be
> removed,
> > > > these
> > > > > APIs will be survive one or two versions in flink repo before they
> > are
> > > > > actually removed.
> > > > >
> > > > > I agree that it is good to support the SourceFunction API. However,
> > > > > given that the SourceFunction API is marked as deprecated, I think
> I
> > > > > will prioritize supporting the FLIP-27 Source. We can support the
> > > > > SourceFunction API after the
> > > > > FLIP-27 source. What do you think?
> > > > >
> > > > > Best regards,
> > > > > Xuannan
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com>
> wrote:
> > > > > >
> > > > > > Thanks Xuannan for driving this FLIP !
> > > > > >
> > > > > > The proposal generally looks good to me, but I still left some
> > > > comments:
> > > > > >
> > > > > > > One more question about the FLIP is that the FLIP says "Note
> that
> > > > this
> > > > > > > config does not support switching source's isProcessingBacklog
> > from
> > > > > false to true
> > > > > > > for now.” Does that mean the job can never back to streaming
> mode
> > > > once
> > > > > switches into
> > > > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > > > possible
> > > > > to
> > > > > > > support switching back in this FLIP?
> > > > > > +1 for Jark’s concern, IIUC, the state transition of
> > > > IsProcessingBacklog
> > > > > depends on whether the data in the source is processing backlog
> data
> > or
> > > > > not. Different sources will have different backlog status and which
> > may
> > > > > change over time. From a general perspective, we should not have
> this
> > > > > restriction.
> > > > > >
> > > > > > > The FLIP describe that: And it should report
> > > > isProcessingBacklog=false
> > > > > at the beginning of the snapshot stage.
> > > > > > This should be “changelog stage”
> > > > > >
> > > > > > I'm not sure if it's enough to support this feature only in
> FLIP-27
> > > > > Source. Although we are pushing the sourceFunction API to be
> removed,
> > > > these
> > > > > APIs will be survive one or two versions in flink repo before they
> > are
> > > > > actually removed.
> > > > > >
> > > > > > Best,
> > > > > > Leonard
> > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > >
> > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
> suxuanna...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> Thank you for all the reviews and suggestions.
> > > > > > >>
> > > > > > >> I believe all the comments have been addressed. If there are
> no
> > > > > > >> further comments, I plan to open the voting thread for this
> FLIP
> > > > early
> > > > > > >> next week.
> > > > > > >>
> > > > > > >> Best regards,
> > > > > > >> Xuannan
> > > > > > >>
> > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> > > <j...@ververica.com.invalid
> > > > >
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>> Hi Xuannan,
> > > > > > >>>
> > > > > > >>> I thought FLIP-328 will compete with FLIP-309 while setting
> the
> > > > > value of
> > > > > > >>> the backlog. Understood. Thanks for the hint.
> > > > > > >>>
> > > > > > >>> Best regards,
> > > > > > >>> Jing
> > > > > > >>>
> > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> > > suxuanna...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>>> Hi Jing,
> > > > > > >>>>
> > > > > > >>>> Thank you for the clarification.
> > > > > > >>>>
> > > > > > >>>> For the use case you mentioned, I believe we can utilize the
> > > > > > >>>> HybridSource, as updated in FLIP-309[1], to determine the
> > > backlog
> > > > > > >>>> status. For example, if the user wants to process data
> before
> > > > time T
> > > > > > >>>> in batch mode and after time T in stream mode, they can set
> > the
> > > > > first
> > > > > > >>>> source of the HybridSource to read up to time T and the last
> > > > source
> > > > > of
> > > > > > >>>> the HybridSource to read from time T.
> > > > > > >>>>
> > > > > > >>>> Best,
> > > > > > >>>> Xuannan
> > > > > > >>>>
> > > > > > >>>> [1]
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > > > <j...@ververica.com.invalid
> > > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>> Hi Xuannan,
> > > > > > >>>>>
> > > > > > >>>>> Thanks for the clarification.
> > > > > > >>>>>
> > > > > > >>>>> 3. Event time and process time are two different things. It
> > > might
> > > > > be
> > > > > > >>>> rarely
> > > > > > >>>>> used, but conceptually, users can process data in the past
> > > > within a
> > > > > > >>>>> specific time range in the streaming mode. All data before
> > that
> > > > > range
> > > > > > >>>> will
> > > > > > >>>>> be considered as backlog and needed to be processed in the
> > > batch
> > > > > > >> mode,
> > > > > > >>>>> like, e.g. the Present Perfect Progressive tense used in
> > > English
> > > > > > >>>> language.
> > > > > > >>>>>
> > > > > > >>>>> Best regards,
> > > > > > >>>>> Jing
> > > > > > >>>>>
> > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > > > suxuanna...@gmail.com>
> > > > > > >>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Hi Jing,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks for the reply.
> > > > > > >>>>>>
> > > > > > >>>>>> 1. You are absolutely right that the watermark lag
> threshold
> > > > must
> > > > > > >> be
> > > > > > >>>>>> carefully set with a thorough understanding of watermark
> > > > > > >> generation.
> > > > > > >>>> It is
> > > > > > >>>>>> crucial for users to take into account the
> WatermarkStrategy
> > > > when
> > > > > > >>>> setting
> > > > > > >>>>>> the watermark lag threshold.
> > > > > > >>>>>>
> > > > > > >>>>>> 2. Regarding pure processing-time based stream processing
> > > jobs,
> > > > > > >>>>>> alternative strategies will be implemented to determine
> > > whether
> > > > > the
> > > > > > >>>> job is
> > > > > > >>>>>> processing backlog data. I have outlined two possible
> > > strategies
> > > > > > >> below:
> > > > > > >>>>>>
> > > > > > >>>>>> - Based on the source operator's state. For example, when
> > > MySQL
> > > > > CDC
> > > > > > >>>> source
> > > > > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > > > > >>>>>> - Based on metrics. For example, when busyTimeMsPerSecond
> > (or
> > > > > > >>>>>> backPressuredTimeMsPerSecond) > user_specified_threshold,
> > then
> > > > > > >>>>>> isBacklog=true.
> > > > > > >>>>>>
> > > > > > >>>>>> As of the strategies proposed in this FLIP, it rely on
> > > generated
> > > > > > >>>>>> watermarks. Therefore, if a user intends for the job to
> > detect
> > > > > > >> backlog
> > > > > > >>>>>> status based on watermark, it is necessary to generate the
> > > > > > >> watermark.
> > > > > > >>>>>>
> > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your question. From
> my
> > > > > > >>>> understanding,
> > > > > > >>>>>> it should work in both cases. When event times are close
> to
> > > the
> > > > > > >>>> processing
> > > > > > >>>>>> time, resulting in watermarks close to the processing
> time,
> > > the
> > > > > > >> job is
> > > > > > >>>> not
> > > > > > >>>>>> processing backlog data. On the other hand, when event
> times
> > > are
> > > > > > >> far
> > > > > > >>>> from
> > > > > > >>>>>> processing time, causing watermarks to also be distant, if
> > the
> > > > lag
> > > > > > >>>>>> surpasses the defined threshold, the job is considered
> > > > processing
> > > > > > >>>> backlog
> > > > > > >>>>>> data.
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Xuannan
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> > > <j...@ververica.com.INVALID
> > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>> Hi Xuannan,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for the clarification. That is the part where I am
> > > > trying
> > > > > > >> to
> > > > > > >>>>>>> understand your thoughts. I have some follow-up
> questions:
> > > > > > >>>>>>>
> > > > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and how
> > > > > > >> customized
> > > > > > >>>>>>> watermark generation looks like. It mixes business logic
> > with
> > > > > > >>>> technical
> > > > > > >>>>>>> implementation and technical data processing mode. The
> > value
> > > of
> > > > > > >> the
> > > > > > >>>>>>> watermark lag threshold must be set very carefully. If
> the
> > > > value
> > > > > > >> is
> > > > > > >>>> too
> > > > > > >>>>>>> small. any time, when the watermark generation logic is
> > > > > > >>>> changed(business
> > > > > > >>>>>>> logic changes lead to the threshold getting exceeded),
> the
> > > same
> > > > > > >> job
> > > > > > >>>> might
> > > > > > >>>>>>> be running surprisingly in backlog processing mode, i.e.
> a
> > > > > > >> butterfly
> > > > > > >>>>>>> effect. A comprehensive documentation is required to
> avoid
> > > any
> > > > > > >>>> confusion
> > > > > > >>>>>>> for the users.
> > > > > > >>>>>>> 2. Like Jark already mentioned, use cases that do not
> have
> > > > > > >>>> watermarks,
> > > > > > >>>>>>> like pure processing-time based stream processing[1] are
> > not
> > > > > > >>>> covered. It
> > > > > > >>>>>> is
> > > > > > >>>>>>> more or less a trade-off solution that does not support
> > such
> > > > use
> > > > > > >>>> cases
> > > > > > >>>>>> and
> > > > > > >>>>>>> appropriate documentation is required. Forcing them to
> > > > explicitly
> > > > > > >>>>>> generate
> > > > > > >>>>>>> watermarks that are never needed just because of this
> does
> > > not
> > > > > > >> sound
> > > > > > >>>>>> like a
> > > > > > >>>>>>> proper solution.
> > > > > > >>>>>>> 3. If I am not mistaken, it only works for use cases
> where
> > > > event
> > > > > > >>>> times
> > > > > > >>>>>> are
> > > > > > >>>>>>> very close to the processing times, because the wall
> clock
> > is
> > > > > > >> used to
> > > > > > >>>>>>> calculate the watermark lag and the watermark is
> generated
> > > > based
> > > > > > >> on
> > > > > > >>>> the
> > > > > > >>>>>>> event time.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Best regards,
> > > > > > >>>>>>> Jing
> > > > > > >>>>>>>
> > > > > > >>>>>>> [1]
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > > > > >> suxuanna...@gmail.com>
> > > > > > >>>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi Jing,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thank you for the suggestion.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> The definition of watermark lag is the same as the
> > > > watermarkLag
> > > > > > >>>> metric
> > > > > > >>>>>> in
> > > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag
> > calculation
> > > > is
> > > > > > >>>>>> computed at
> > > > > > >>>>>>>> the time when a watermark is emitted downstream in the
> > > > following
> > > > > > >>>> way:
> > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have added
> this
> > > > > > >>>> description to
> > > > > > >>>>>>>> the FLIP.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I hope this addresses your concern.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>> Xuannan
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> [1]
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > > > <j...@ververica.com.INVALID
> > > > > > >>>
> > > > > > >>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Hi Xuannan,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> There is one tiny thing that I am not sure if I
> > understand
> > > it
> > > > > > >>>>>> correctly.
> > > > > > >>>>>>>>> Since there will be many different WatermarkStrategies
> > and
> > > > > > >>>> different
> > > > > > >>>>>>>>> WatermarkGenerators. Could you please update the FLIP
> and
> > > add
> > > > > > >> the
> > > > > > >>>>>>>>> description of how the watermark lag is calculated
> > exactly?
> > > > > > >> E.g.
> > > > > > >>>>>>>> Watermark
> > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the watermark
> > > emitted
> > > > > > >> to the
> > > > > > >>>>>>>>> downstream and B is....(this is the part I am not
> really
> > > sure
> > > > > > >> after
> > > > > > >>>>>>>> reading
> > > > > > >>>>>>>>> the FLIP).
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Best regards,
> > > > > > >>>>>>>>> Jing
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > > > > >> suxuanna...@gmail.com>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Hi Jark,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Thanks for the comments.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I agree that the current solution cannot support jobs
> > that
> > > > > > >> cannot
> > > > > > >>>>>> define
> > > > > > >>>>>>>>>> watermarks. However, after considering the
> > > > > > >> pending-record-based
> > > > > > >>>>>>>> solution, I
> > > > > > >>>>>>>>>> believe the current solution is superior for the
> target
> > > use
> > > > > > >> case
> > > > > > >>>> as it
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>> more intuitive for users. The backlog status gives
> users
> > > the
> > > > > > >>>> ability
> > > > > > >>>>>> to
> > > > > > >>>>>>>>>> balance between throughput and latency. Making this
> > > > trade-off
> > > > > > >>>> decision
> > > > > > >>>>>>>>>> based on the watermark lag is more intuitive from the
> > > user's
> > > > > > >>>>>>>> perspective.
> > > > > > >>>>>>>>>> For instance, a user can decide that if the job lags
> > > behind
> > > > > > >> the
> > > > > > >>>>>> current
> > > > > > >>>>>>>>>> time by more than 1 hour, the result is not usable. In
> > > that
> > > > > > >> case,
> > > > > > >>>> we
> > > > > > >>>>>> can
> > > > > > >>>>>>>>>> optimize for throughput when the data lags behind by
> > more
> > > > > > >> than an
> > > > > > >>>>>> hour.
> > > > > > >>>>>>>>>> With the pending-record-based solution, it's
> challenging
> > > for
> > > > > > >>>> users to
> > > > > > >>>>>>>>>> determine when to optimize for throughput and when to
> > > > > > >> prioritize
> > > > > > >>>>>>>> latency.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Regarding the limitations of the watermark-based
> > solution:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 1. The current solution can support jobs with sources
> > that
> > > > > > >> have
> > > > > > >>>> event
> > > > > > >>>>>>>>>> time. Users can always define a watermark at the
> source
> > > > > > >> operator,
> > > > > > >>>> even
> > > > > > >>>>>>>> if
> > > > > > >>>>>>>>>> it's not used by downstream operators, such as
> streaming
> > > > join
> > > > > > >> and
> > > > > > >>>>>>>> unbounded
> > > > > > >>>>>>>>>> aggregate.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> > watermark
> > > > lag
> > > > > > >> will
> > > > > > >>>>>> keep
> > > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> > watermark
> > > > > > >> lag and
> > > > > > >>>>>>>> backlog
> > > > > > >>>>>>>>>> status are determined at the moment when the watermark
> > is
> > > > > > >> emitted
> > > > > > >>>> to
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> > > source,
> > > > > > >> the
> > > > > > >>>>>>>> watermark
> > > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > > >>>> WatermarkStrategy
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when
> it
> > > > > > >> becomes
> > > > > > >>>> idle.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to
> determine
> > > if a
> > > > > > >> job
> > > > > > >>>> is
> > > > > > >>>>>>>>>> processing backlog data. Even when using pending
> > records,
> > > it
> > > > > > >>>> faces a
> > > > > > >>>>>>>>>> similar issue. For example, if the source has 1K
> pending
> > > > > > >> records,
> > > > > > >>>>>> those
> > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If
> > the
> > > > > > >> records
> > > > > > >>>>>> span
> > > > > > >>>>>>>> 1
> > > > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> > > they
> > > > > > >> span 1
> > > > > > >>>>>>>> hour, it
> > > > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > > > >> optimizing
> > > > > > >>>> for
> > > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> In summary, I believe the watermark-based solution is
> a
> > > > > > >> superior
> > > > > > >>>>>> choice
> > > > > > >>>>>>>>>> for the target use case where watermark/event time can
> > be
> > > > > > >> defined.
> > > > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> > > requires
> > > > > > >>>>>> low-latency
> > > > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > > > >> watermarks.
> > > > > > >>>> If
> > > > > > >>>>>> we
> > > > > > >>>>>>>>>> encounter such a use case, we can create another FLIP
> to
> > > > > > >> address
> > > > > > >>>> those
> > > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Best,
> > > > > > >>>>>>>>>> Xuannan
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com
> > > > > > >> <mailto:
> > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi Xuannan,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thanks for opening this discussion.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> This current proposal may work in the mentioned
> > watermark
> > > > > > >> cases.
> > > > > > >>>>>>>>>>> However, it seems this is not a general solution for
> > > > sources
> > > > > > >> to
> > > > > > >>>>>>>> determine
> > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > >>>>>>>>>>> From my point of view, there are 3 limitations of the
> > > > current
> > > > > > >>>>>> proposal:
> > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> > > > watermark/event-time
> > > > > > >>>>>> defined,
> > > > > > >>>>>>>>>>> for example streaming join and unbounded aggregate.
> We
> > > may
> > > > > > >> still
> > > > > > >>>> need
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>> figure out solutions for them.
> > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it
> > increases
> > > > > > >>>> unlimited
> > > > > > >>>>>> if
> > > > > > >>>>>>>> no
> > > > > > >>>>>>>>>>> data is generated in the Kafka.
> > > > > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of
> > > backlog.
> > > > > > >> If the
> > > > > > >>>>>>>>>> watermark
> > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > > > > >>>>>>>>>>> there is possibly only 1 pending record there, which
> > > means
> > > > no
> > > > > > >>>> backlog
> > > > > > >>>>>>>> at
> > > > > > >>>>>>>>>>> all.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric
> > used
> > > > to
> > > > > > >>>>>> determine
> > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > >>>>>>>>>>> What we need is something that reflects the number of
> > > > records
> > > > > > >>>>>>>> unprocessed
> > > > > > >>>>>>>>>>> by the job.
> > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric
> proposed
> > in
> > > > > > >>>> FLIP-33 and
> > > > > > >>>>>>>> has
> > > > > > >>>>>>>>>>> been implemented by Kafka source.
> > > > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric to
> > > determine
> > > > > > >>>>>>>>>>> "isProcessingBacklog"?
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>> Jark
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> [1]
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > >>>>>>>>>> <
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > > > > >>>> tonysong...@gmail.com
> > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Sounds good to me.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> It is true that, if we are introducing the
> generalized
> > > > > > >>>> watermark,
> > > > > > >>>>>>>> there
> > > > > > >>>>>>>>>>>> will be other watermark related concepts /
> > > configurations
> > > > > > >> that
> > > > > > >>>> need
> > > > > > >>>>>> to
> > > > > > >>>>>>>>>> be
> > > > > > >>>>>>>>>>>> updated anyway.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Xintong
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > > > > >>>> suxuanna...@gmail.com
> > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Hi Xingtong,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> After considering the idea of using a general
> > > > configuration
> > > > > > >>>> key, I
> > > > > > >>>>>>>>>> think
> > > > > > >>>>>>>>>>>>> it may not be a good idea for the reasons below.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> While I agree that using a more general
> configuration
> > > key
> > > > > > >>>> provides
> > > > > > >>>>>> us
> > > > > > >>>>>>>>>>>> with
> > > > > > >>>>>>>>>>>>> the flexibility to switch to other approaches to
> > > > calculate
> > > > > > >> the
> > > > > > >>>> lag
> > > > > > >>>>>> in
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>> future, the downside is that it may cause confusion
> > for
> > > > > > >> users.
> > > > > > >>>> We
> > > > > > >>>>>>>>>>>> currently
> > > > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> > > > watermarkLag
> > > > > > >> in
> > > > > > >>>> the
> > > > > > >>>>>>>>>> source,
> > > > > > >>>>>>>>>>>>> and it is not clear which specific lag we are
> > referring
> > > > to.
> > > > > > >>>> With
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>>>>> potential introduction of the Generalized Watermark
> > > > > > >> mechanism
> > > > > > >>>> in
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>>>>> future, if I understand correctly, a watermark
> won't
> > > > > > >>>> necessarily
> > > > > > >>>>>> need
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the general
> > > configuration
> > > > > > >> key
> > > > > > >>>> may
> > > > > > >>>>>> not
> > > > > > >>>>>>>>>> be
> > > > > > >>>>>>>>>>>>> enough to cover all the use case and we will need
> to
> > > > > > >> introduce
> > > > > > >>>> a
> > > > > > >>>>>>>>>> general
> > > > > > >>>>>>>>>>>>> way to determine the backlog status regardless.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> > > > > > >> configuration
> > > > > > >>>> as
> > > > > > >>>>>> is,
> > > > > > >>>>>>>>>> and
> > > > > > >>>>>>>>>>>>> change it later with the a deprecation process or
> > > > migration
> > > > > > >>>>>> process.
> > > > > > >>>>>>>>>> What
> > > > > > >>>>>>>>>>>>> do you think?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>> Xuannan
> > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > > > > >>>> tonysong...@gmail.com
> > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > >>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose this
> detail
> > > via
> > > > > > >> the
> > > > > > >>>>>>>>>>>>> configuration
> > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not mentioning
> the
> > > > > > >>>> "watermark"
> > > > > > >>>>>>>>>>>> keyword
> > > > > > >>>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>> the configuration key and description.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> - From the users' perspective, I think they only
> > need
> > > to
> > > > > > >> know
> > > > > > >>>>>>>> there's
> > > > > > >>>>>>>>>> a
> > > > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink will
> > > consider
> > > > > > >>>> latency
> > > > > > >>>>>> of
> > > > > > >>>>>>>>>>>>>> individual records as less important and
> prioritize
> > > > > > >> throughput
> > > > > > >>>>>> over
> > > > > > >>>>>>>>>> it.
> > > > > > >>>>>>>>>>>>>> They don't really need the details of how the lags
> > are
> > > > > > >>>> calculated.
> > > > > > >>>>>>>>>>>>>> - For the internal implementation, I also think
> > using
> > > > > > >>>> watermark
> > > > > > >>>>>> lags
> > > > > > >>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already
> > mentioned.
> > > > > > >>>> However,
> > > > > > >>>>>> it's
> > > > > > >>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail from
> > > users
> > > > > > >> would
> > > > > > >>>> give
> > > > > > >>>>>>>> us
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if
> needed
> > in
> > > > > > >> future.
> > > > > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > > > > >> ProcessFunction
> > > > > > >>>> API
> > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an
> > idea
> > > to
> > > > > > >>>>>> introduce a
> > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where basically
> the
> > > > > > >>>> watermark can
> > > > > > >>>>>>>> be
> > > > > > >>>>>>>>>>>>>> anything that needs to travel along the data-flow
> > with
> > > > > > >> certain
> > > > > > >>>>>>>>>>>> alignment
> > > > > > >>>>>>>>>>>>>> strategies, and event time watermark would be one
> > > > specific
> > > > > > >>>> case of
> > > > > > >>>>>>>> it.
> > > > > > >>>>>>>>>>>>> This
> > > > > > >>>>>>>>>>>>>> is still an idea and has not been discussed and
> > agreed
> > > > on
> > > > > > >> by
> > > > > > >>>> the
> > > > > > >>>>>>>>>>>>> community,
> > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are
> > > going
> > > > > > >> for
> > > > > > >>>> it,
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>>>>> concept
> > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd
> also
> > be
> > > > > > >> fine
> > > > > > >>>> with
> > > > > > >>>>>>>>>>>>>> introducing the configuration as is, and changing
> it
> > > > > > >> later, if
> > > > > > >>>>>>>> needed,
> > > > > > >>>>>>>>>>>>> with
> > > > > > >>>>>>>>>>>>>> a regular deprecation and migration process. Just
> > > making
> > > > > > >> my
> > > > > > >>>>>>>>>>>> suggestions.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Xintong
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Hi Xintong,
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I have considered using the timestamp in the
> > records
> > > to
> > > > > > >>>> determine
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use watermark at
> the
> > > > end.
> > > > > > >> By
> > > > > > >>>>>>>>>>>> definition,
> > > > > > >>>>>>>>>>>>>>> watermark is the time progress indication in the
> > data
> > > > > > >>>> stream. It
> > > > > > >>>>>>>>>>>>> indicates
> > > > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to some
> > > specific
> > > > > > >>>> time. On
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> other
> > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
> > > > > > >> generate
> > > > > > >>>> the
> > > > > > >>>>>>>>>>>>> watermark.
> > > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and
> > intuitive
> > > to
> > > > > > >>>> calculate
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> event
> > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine the backlog
> > > status.
> > > > > > >> And
> > > > > > >>>> by
> > > > > > >>>>>>>> using
> > > > > > >>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the
> out-of-order
> > > and
> > > > > > >> the
> > > > > > >>>>>>>> idleness
> > > > > > >>>>>>>>>>>>> of the
> > > > > > >>>>>>>>>>>>>>> data.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Please let me know if you have further questions.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>> Xuannan
> > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > > > >>>>>> tonysong...@gmail.com
> > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> +1 in general.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we are
> > > relying
> > > > > > >> on
> > > > > > >>>> the
> > > > > > >>>>>>>>>>>>> watermark
> > > > > > >>>>>>>>>>>>>>> for
> > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use
> > > timestamps
> > > > > > >> in the
> > > > > > >>>>>>>>>>>>> records? I
> > > > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
> > > > > > >> wondering if
> > > > > > >>>>>>>>>>>> there's
> > > > > > >>>>>>>>>>>>> any
> > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Xintong
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328:
> > Allow
> > > > > > >> source
> > > > > > >>>>>>>>>>>>> operators to
> > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on
> watermark
> > > > > > >> lag[1]. We
> > > > > > >>>>>> had a
> > > > > > >>>>>>>>>>>>>>> several
> > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the design,
> and
> > > > thanks
> > > > > > >>>> for all
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>> valuable advice.
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user
> > > want
> > > > to
> > > > > > >>>> run a
> > > > > > >>>>>>>>>>>> Flink
> > > > > > >>>>>>>>>>>>>>> job to
> > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high throughput
> > > manner
> > > > > > >> and
> > > > > > >>>>>> continue
> > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low latency.
> > > Building
> > > > > > >> upon
> > > > > > >>>> the
> > > > > > >>>>>>>>>>>>> backlog
> > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this
> proposal
> > > > > > >> enables
> > > > > > >>>>>> sources
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>> report
> > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog based on the
> > > > > > >> watermark
> > > > > > >>>> lag.
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or
> > > feedback
> > > > > > >> you
> > > > > > >>>> may
> > > > > > >>>>>> have
> > > > > > >>>>>>>>>>>>> on
> > > > > > >>>>>>>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>>>>> proposal.
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>>> Xuannan
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> [1]
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > >>>>>>>>>> <
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> [2]
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > >>>>>>>>>> <
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to