Hi Dong,

Please see my comments inline.

>  As a result, the proposed job-level
> config will be applied only in the changelog stage. So there is no
> difference between these two approaches in this particular case, right?

How the job-level config can be applied ONLY in the changelog stage?
I think it is only possible if it is implemented by the CDC source itself,
because the framework doesn't know which stage of the source is.
Know that the CDC source may emit watermarks with a very small lag
in the snapshot stage, and the job-level config may turn the backlog
status into false.

> On the other hand, per-source config will be necessary if users want to
> apply different watermark lag thresholds for different sources in the same
> job.

We also have different watermark delay definitions for each source,
I think this's also reasonable and necessary to have different watermark
lags.


> Each source can have its own rule that specifies when the backlog can be
true
> (e.g. MySql CDC says the backlog should be true during the snapshot
stage).
> And we can have a job-level config that specifies when the backlog should
> be true. Note that it is designed in such a way that none of these rules
> specify when the backlog should be false. That is why there is no conflict
> by definition.

IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the backlog
is true and when is FALSE. This conflicts with the job-level config as it
will turn
the status into true.

> If I understand your comments correctly, you mean that we might have a
> Flink SQL DDL with user-defined watermark expressions. And users also want
> to set the backlog to true if the watermark generated by that
> user-specified expression exceeds a threshold.

No. I mean the source may not support generating watermarks, so the
watermark
expression is applied in a following operator (instead of in the source
operator).
This will result in the watermark lag doesn't work in this case and confuse
users.

> You are right that this is a limitation. However, this is only a
short-term
> limitation which we added to make sure that we can focus on the capability
> to switch from backlog=true to backlog=false. In the future, we will
remove
> this limitation and also support switching from backlog=false to
> backlog=true.

I can understand it may be difficult to support runtime mode switching back
and forth.
However, I think this should be a limitation of FLIP-327, not FLIP-328.
IIUC,
FLIP-309 doesn't have this limitation, right? I just don't understand
what's the
challenge to switch a flag?

Best,
Jark


On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com> wrote:

> Hi Jark,
>
> Thanks for the comments. Please see my comments inline.
>
> On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Xuannan,
> >
> > I leave my comments inline.
> >
> > > In the case where a user wants to
> > > use a CDC source and also determine backlog status based on watermark
> > > lag, we still need to define the rule when that occurs
> >
> > This rule should be defined by the source itself (who knows backlog
> best),
> > not by the framework. In the case of CDC source, it reports
> isBacklog=true
> > during snapshot stage, and report isBacklog=false during changelog stage
> if
> > watermark-lag is within the threshold.
> >
>
> I am not sure I fully understand the difference between adding a job-level
> config vs. adding a per-source config.
>
> In the case of CDC, its watermark lag should be either unde-defined or
> really large in the snapshot stage. As a result, the proposed job-level
> config will be applied only in the changelog stage. So there is no
> difference between these two approaches in this particular case, right?
>
> There are two advantages of the job-level config over per-source config:
>
> 1) Configuration is simpler. For example, suppose a user has a Flink job
> that consumes records from multiple Kafka sources and wants to determine
> backlog status for these Kafka sources using the same watermark lag
> threshold, there is no need for users to repeatedly specify this threshold
> for each source.
>
> 2) There is a smaller number of public APIs overall. In particular, instead
> of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API for
> every source operator that has even-time watermark lag defined, we only
> need to add one job-level config. Less public API means better simplicity
> and maintainability in general.
>
> On the other hand, per-source config will be necessary if users want to
> apply different watermark lag thresholds for different sources in the same
> job. Personally, I find this a bit counter-intuitive for users to specify
> different watermark lag thresholds in the same job.
>
> Do you think there is any real-word use-case that requires this? Could you
> provide a specific use-case where per-source config can provide an
> advantage over the job-level config?
>
>
> > I think it's not intuitive to combine it with the logical OR operation.
> > Even for the
> > combination logic of backlog status from different channels, FLIP-309
> said
> > it is
> > "up to the operator to determine its output records' isBacklog value" and
> > proposed
> > 3 different strategies. Therefore, I think backlog status from a single
> > source should
> > be up to the source.
>
>
> For both the job-level config and the per-source config, it is eventually
> up to the user to decide the computation logic of the backlog status.
> Whether this mechanism is implemented at the per-source level or framework
> level is probably more like an implementation detail.
>
> Eventually, I think the choice between these two approaches depends on
> whether we have any use-case for users to specify different watermark lag
> thresholds in the same job.
>
>
> >
> > IMO, a better API design is not how to resolve conflicts but not
> > introducing conflicts.
>
>
> Just to clarify, the current FLIP does not introduce any conflict. Each
> source can have its own rule that specifies when the backlog can be true
> (e.g. MySql CDC says the backlog should be true during the snapshot stage).
> And we can have a job-level config that specifies when the backlog should
> be true. Note that it is designed in such a way that none of these rules
> specify when the backlog should be false. That is why there is no conflict
> by definition.
>
>
>
> Let the source determine backlog status removes the conflicts and I don't
> > see big
> > disadvantages.
> >
> > > It should not confuse the user that
> > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > backlog.watermark-lag-threshold, as it is not a source.
> >
> > Hmm, so this configuration may confuse Flink SQL users, because all
> > watermarks
> > are defined on the source DDL, but it may use a separate operator to emit
> > watermarks
> > if the source doesn't support emitting watermarks.
> >
>
> If I understand your comments correctly, you mean that we might have a
> Flink SQL DDL with user-defined watermark expressions. And users also want
> to set the backlog to true if the watermark generated by that
> user-specified expression exceeds a threshold.
>
> That is a good point and use-case. I agree we should also cover this
> scenario. And we can update FLIP-328 to mention that the job-level config
> will also be applicable when the watermark derived from the Flink SQL DDL
> exceeds this threshold. Would this address your concern?
>
>
> >
> > > I think the description in the FLIP actually means the other way
> > > around, where the job can never switch back to batch mode once it has
> > > switched into streaming mode. This is to align with the current state
> > > of FLIP-327[1], where only switching from batch to stream mode is
> > > supported.
> >
> > This sounds like a limitation of FLIP-327 (that execution mode depends on
> > backlog status).
> > But the backlog status shouldn't have this limitation, because it is not
> > only used for execution
> > switching.
> >
>
> You are right that this is a limitation. However, this is only a short-term
> limitation which we added to make sure that we can focus on the capability
> to switch from backlog=true to backlog=false. In the future, we will remove
> this limitation and also support switching from backlog=false to
> backlog=true.
>
> The capability to switch from backlog=true to backlog=false will mitigate a
> lot of problems we are facing now. As it is common for users to start a
> Flink job to process backlog data followed by real-time data. On the other
> hand, switching from backlog=false to backlog=true is useful when there is
> a traffic spike while the Flink job is processing real-time data, which is
> also useful to address but less important than the previous one.
>
> Given that both features require considerable changes to the underlying
> runtime, we think it might be useful and safe to tackle them one by one.
>
> Thanks again for the comments. Please let us know what you think.
>
> Best,
> Dong
>
>
> >
> > Best,
> > Jark
> >
> >
> >
> > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <suxuanna...@gmail.com> wrote:
> >
> > > Hi Jark and Leonard,
> > >
> > > Thanks for the comments. Please see my reply below.
> > >
> > > @Jark
> > >
> > > > I think a better API doesn't compete with itself. Therefore, I'm in
> > > favor of
> > > > supporting the watermark lag threshold for each source without
> > > introducing
> > > > any framework API and configuration.
> > >
> > > I don't think supporting the watermark lag threshold for each source
> > > can avoid the competition problem. In the case where a user wants to
> > > use a CDC source and also determine backlog status based on watermark
> > > lag, we still need to define the rule when that occurs. With that
> > > said, I think it is more intuitive to combine it with the logical OR
> > > operation, as the strategies (FLIP-309, FLIP-328) only determine when
> > > the source's backlog status should be True. What do you think?
> > >
> > > > Besides, this can address another concern that the watermark may be
> > > > generated by DataStream#assignTimestampsAnd
> > > > Watermarks which doesn't
> > > > work with the backlog.watermark-lag-threshold job config
> > >
> > > The description of the configuration explicitly states that "a source
> > > would report isProcessingBacklog=true if its watermark lag exceeds the
> > > configured value". It should not confuse the user that
> > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > backlog.watermark-lag-threshold, as it is not a source.
> > >
> > > > Does that mean the job can never back to streaming mode once switches
> > > into
> > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> possible
> > to
> > > > support switching back in this FLIP?
> > >
> > > I think the description in the FLIP actually means the other way
> > > around, where the job can never switch back to batch mode once it has
> > > switched into streaming mode. This is to align with the current state
> > > of FLIP-327[1], where only switching from batch to stream mode is
> > > supported.
> > >
> > > @Leonard
> > >
> > > > > The FLIP describe that: And it should report
> > isProcessingBacklog=false
> > > at the beginning of the snapshot stage.
> > > > This should be “changelog stage”
> > >
> > > I think the description is in FLIP-309. Thanks for pointing out. I
> > > updated the description.
> > >
> > > > I'm not sure if it's enough to support this feature only in FLIP-27
> > > Source. Although we are pushing the sourceFunction API to be removed,
> > these
> > > APIs will be survive one or two versions in flink repo before they are
> > > actually removed.
> > >
> > > I agree that it is good to support the SourceFunction API. However,
> > > given that the SourceFunction API is marked as deprecated, I think I
> > > will prioritize supporting the FLIP-27 Source. We can support the
> > > SourceFunction API after the
> > > FLIP-27 source. What do you think?
> > >
> > > Best regards,
> > > Xuannan
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > >
> > >
> > >
> > >
> > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com> wrote:
> > > >
> > > > Thanks Xuannan for driving this FLIP !
> > > >
> > > > The proposal generally looks good to me, but I still left some
> > comments:
> > > >
> > > > > One more question about the FLIP is that the FLIP says "Note that
> > this
> > > > > config does not support switching source's isProcessingBacklog from
> > > false to true
> > > > > for now.” Does that mean the job can never back to streaming mode
> > once
> > > switches into
> > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > possible
> > > to
> > > > > support switching back in this FLIP?
> > > > +1 for Jark’s concern, IIUC, the state transition of
> > IsProcessingBacklog
> > > depends on whether the data in the source is processing backlog data or
> > > not. Different sources will have different backlog status and which may
> > > change over time. From a general perspective, we should not have this
> > > restriction.
> > > >
> > > > > The FLIP describe that: And it should report
> > isProcessingBacklog=false
> > > at the beginning of the snapshot stage.
> > > > This should be “changelog stage”
> > > >
> > > > I'm not sure if it's enough to support this feature only in FLIP-27
> > > Source. Although we are pushing the sourceFunction API to be removed,
> > these
> > > APIs will be survive one or two versions in flink repo before they are
> > > actually removed.
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <suxuanna...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Thank you for all the reviews and suggestions.
> > > > >>
> > > > >> I believe all the comments have been addressed. If there are no
> > > > >> further comments, I plan to open the voting thread for this FLIP
> > early
> > > > >> next week.
> > > > >>
> > > > >> Best regards,
> > > > >> Xuannan
> > > > >>
> > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> <j...@ververica.com.invalid
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>> Hi Xuannan,
> > > > >>>
> > > > >>> I thought FLIP-328 will compete with FLIP-309 while setting the
> > > value of
> > > > >>> the backlog. Understood. Thanks for the hint.
> > > > >>>
> > > > >>> Best regards,
> > > > >>> Jing
> > > > >>>
> > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> suxuanna...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> Hi Jing,
> > > > >>>>
> > > > >>>> Thank you for the clarification.
> > > > >>>>
> > > > >>>> For the use case you mentioned, I believe we can utilize the
> > > > >>>> HybridSource, as updated in FLIP-309[1], to determine the
> backlog
> > > > >>>> status. For example, if the user wants to process data before
> > time T
> > > > >>>> in batch mode and after time T in stream mode, they can set the
> > > first
> > > > >>>> source of the HybridSource to read up to time T and the last
> > source
> > > of
> > > > >>>> the HybridSource to read from time T.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Xuannan
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > >>>>
> > > > >>>>
> > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > <j...@ververica.com.invalid
> > > >
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>> Hi Xuannan,
> > > > >>>>>
> > > > >>>>> Thanks for the clarification.
> > > > >>>>>
> > > > >>>>> 3. Event time and process time are two different things. It
> might
> > > be
> > > > >>>> rarely
> > > > >>>>> used, but conceptually, users can process data in the past
> > within a
> > > > >>>>> specific time range in the streaming mode. All data before that
> > > range
> > > > >>>> will
> > > > >>>>> be considered as backlog and needed to be processed in the
> batch
> > > > >> mode,
> > > > >>>>> like, e.g. the Present Perfect Progressive tense used in
> English
> > > > >>>> language.
> > > > >>>>>
> > > > >>>>> Best regards,
> > > > >>>>> Jing
> > > > >>>>>
> > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > suxuanna...@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Jing,
> > > > >>>>>>
> > > > >>>>>> Thanks for the reply.
> > > > >>>>>>
> > > > >>>>>> 1. You are absolutely right that the watermark lag threshold
> > must
> > > > >> be
> > > > >>>>>> carefully set with a thorough understanding of watermark
> > > > >> generation.
> > > > >>>> It is
> > > > >>>>>> crucial for users to take into account the WatermarkStrategy
> > when
> > > > >>>> setting
> > > > >>>>>> the watermark lag threshold.
> > > > >>>>>>
> > > > >>>>>> 2. Regarding pure processing-time based stream processing
> jobs,
> > > > >>>>>> alternative strategies will be implemented to determine
> whether
> > > the
> > > > >>>> job is
> > > > >>>>>> processing backlog data. I have outlined two possible
> strategies
> > > > >> below:
> > > > >>>>>>
> > > > >>>>>> - Based on the source operator's state. For example, when
> MySQL
> > > CDC
> > > > >>>> source
> > > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > > >>>>>> - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > > >>>>>> backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > > >>>>>> isBacklog=true.
> > > > >>>>>>
> > > > >>>>>> As of the strategies proposed in this FLIP, it rely on
> generated
> > > > >>>>>> watermarks. Therefore, if a user intends for the job to detect
> > > > >> backlog
> > > > >>>>>> status based on watermark, it is necessary to generate the
> > > > >> watermark.
> > > > >>>>>>
> > > > >>>>>> 3. I'm afraid I'm not fully grasping your question. From my
> > > > >>>> understanding,
> > > > >>>>>> it should work in both cases. When event times are close to
> the
> > > > >>>> processing
> > > > >>>>>> time, resulting in watermarks close to the processing time,
> the
> > > > >> job is
> > > > >>>> not
> > > > >>>>>> processing backlog data. On the other hand, when event times
> are
> > > > >> far
> > > > >>>> from
> > > > >>>>>> processing time, causing watermarks to also be distant, if the
> > lag
> > > > >>>>>> surpasses the defined threshold, the job is considered
> > processing
> > > > >>>> backlog
> > > > >>>>>> data.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Xuannan
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> <j...@ververica.com.INVALID
> > >
> > > > >>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>> Hi Xuannan,
> > > > >>>>>>>
> > > > >>>>>>> Thanks for the clarification. That is the part where I am
> > trying
> > > > >> to
> > > > >>>>>>> understand your thoughts. I have some follow-up questions:
> > > > >>>>>>>
> > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and how
> > > > >> customized
> > > > >>>>>>> watermark generation looks like. It mixes business logic with
> > > > >>>> technical
> > > > >>>>>>> implementation and technical data processing mode. The value
> of
> > > > >> the
> > > > >>>>>>> watermark lag threshold must be set very carefully. If the
> > value
> > > > >> is
> > > > >>>> too
> > > > >>>>>>> small. any time, when the watermark generation logic is
> > > > >>>> changed(business
> > > > >>>>>>> logic changes lead to the threshold getting exceeded), the
> same
> > > > >> job
> > > > >>>> might
> > > > >>>>>>> be running surprisingly in backlog processing mode, i.e. a
> > > > >> butterfly
> > > > >>>>>>> effect. A comprehensive documentation is required to avoid
> any
> > > > >>>> confusion
> > > > >>>>>>> for the users.
> > > > >>>>>>> 2. Like Jark already mentioned, use cases that do not have
> > > > >>>> watermarks,
> > > > >>>>>>> like pure processing-time based stream processing[1] are not
> > > > >>>> covered. It
> > > > >>>>>> is
> > > > >>>>>>> more or less a trade-off solution that does not support such
> > use
> > > > >>>> cases
> > > > >>>>>> and
> > > > >>>>>>> appropriate documentation is required. Forcing them to
> > explicitly
> > > > >>>>>> generate
> > > > >>>>>>> watermarks that are never needed just because of this does
> not
> > > > >> sound
> > > > >>>>>> like a
> > > > >>>>>>> proper solution.
> > > > >>>>>>> 3. If I am not mistaken, it only works for use cases where
> > event
> > > > >>>> times
> > > > >>>>>> are
> > > > >>>>>>> very close to the processing times, because the wall clock is
> > > > >> used to
> > > > >>>>>>> calculate the watermark lag and the watermark is generated
> > based
> > > > >> on
> > > > >>>> the
> > > > >>>>>>> event time.
> > > > >>>>>>>
> > > > >>>>>>> Best regards,
> > > > >>>>>>> Jing
> > > > >>>>>>>
> > > > >>>>>>> [1]
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > >>>>>>>
> > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > > >> suxuanna...@gmail.com>
> > > > >>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Jing,
> > > > >>>>>>>>
> > > > >>>>>>>> Thank you for the suggestion.
> > > > >>>>>>>>
> > > > >>>>>>>> The definition of watermark lag is the same as the
> > watermarkLag
> > > > >>>> metric
> > > > >>>>>> in
> > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag calculation
> > is
> > > > >>>>>> computed at
> > > > >>>>>>>> the time when a watermark is emitted downstream in the
> > following
> > > > >>>> way:
> > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have added this
> > > > >>>> description to
> > > > >>>>>>>> the FLIP.
> > > > >>>>>>>>
> > > > >>>>>>>> I hope this addresses your concern.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>> Xuannan
> > > > >>>>>>>>
> > > > >>>>>>>> [1]
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > <j...@ververica.com.INVALID
> > > > >>>
> > > > >>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Hi Xuannan,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > > >>>>>>>>>
> > > > >>>>>>>>> There is one tiny thing that I am not sure if I understand
> it
> > > > >>>>>> correctly.
> > > > >>>>>>>>> Since there will be many different WatermarkStrategies and
> > > > >>>> different
> > > > >>>>>>>>> WatermarkGenerators. Could you please update the FLIP and
> add
> > > > >> the
> > > > >>>>>>>>> description of how the watermark lag is calculated exactly?
> > > > >> E.g.
> > > > >>>>>>>> Watermark
> > > > >>>>>>>>> lag = A - B with A is the timestamp of the watermark
> emitted
> > > > >> to the
> > > > >>>>>>>>> downstream and B is....(this is the part I am not really
> sure
> > > > >> after
> > > > >>>>>>>> reading
> > > > >>>>>>>>> the FLIP).
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best regards,
> > > > >>>>>>>>> Jing
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > > >> suxuanna...@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Jark,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks for the comments.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I agree that the current solution cannot support jobs that
> > > > >> cannot
> > > > >>>>>> define
> > > > >>>>>>>>>> watermarks. However, after considering the
> > > > >> pending-record-based
> > > > >>>>>>>> solution, I
> > > > >>>>>>>>>> believe the current solution is superior for the target
> use
> > > > >> case
> > > > >>>> as it
> > > > >>>>>>>> is
> > > > >>>>>>>>>> more intuitive for users. The backlog status gives users
> the
> > > > >>>> ability
> > > > >>>>>> to
> > > > >>>>>>>>>> balance between throughput and latency. Making this
> > trade-off
> > > > >>>> decision
> > > > >>>>>>>>>> based on the watermark lag is more intuitive from the
> user's
> > > > >>>>>>>> perspective.
> > > > >>>>>>>>>> For instance, a user can decide that if the job lags
> behind
> > > > >> the
> > > > >>>>>> current
> > > > >>>>>>>>>> time by more than 1 hour, the result is not usable. In
> that
> > > > >> case,
> > > > >>>> we
> > > > >>>>>> can
> > > > >>>>>>>>>> optimize for throughput when the data lags behind by more
> > > > >> than an
> > > > >>>>>> hour.
> > > > >>>>>>>>>> With the pending-record-based solution, it's challenging
> for
> > > > >>>> users to
> > > > >>>>>>>>>> determine when to optimize for throughput and when to
> > > > >> prioritize
> > > > >>>>>>>> latency.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Regarding the limitations of the watermark-based solution:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 1. The current solution can support jobs with sources that
> > > > >> have
> > > > >>>> event
> > > > >>>>>>>>>> time. Users can always define a watermark at the source
> > > > >> operator,
> > > > >>>> even
> > > > >>>>>>>> if
> > > > >>>>>>>>>> it's not used by downstream operators, such as streaming
> > join
> > > > >> and
> > > > >>>>>>>> unbounded
> > > > >>>>>>>>>> aggregate.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the watermark
> > lag
> > > > >> will
> > > > >>>>>> keep
> > > > >>>>>>>>>> increasing if no data is generated in Kafka. The watermark
> > > > >> lag and
> > > > >>>>>>>> backlog
> > > > >>>>>>>>>> status are determined at the moment when the watermark is
> > > > >> emitted
> > > > >>>> to
> > > > >>>>>> the
> > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> source,
> > > > >> the
> > > > >>>>>>>> watermark
> > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > >>>> WatermarkStrategy
> > > > >>>>>>>> with
> > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > > >> becomes
> > > > >>>> idle.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine
> if a
> > > > >> job
> > > > >>>> is
> > > > >>>>>>>>>> processing backlog data. Even when using pending records,
> it
> > > > >>>> faces a
> > > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > > >> records,
> > > > >>>>>> those
> > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If the
> > > > >> records
> > > > >>>>>> span
> > > > >>>>>>>> 1
> > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> they
> > > > >> span 1
> > > > >>>>>>>> hour, it
> > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > >> optimizing
> > > > >>>> for
> > > > >>>>>>>>>> latency is likely the better choice.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > > >> superior
> > > > >>>>>> choice
> > > > >>>>>>>>>> for the target use case where watermark/event time can be
> > > > >> defined.
> > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> requires
> > > > >>>>>> low-latency
> > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > >> watermarks.
> > > > >>>> If
> > > > >>>>>> we
> > > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > > >> address
> > > > >>>> those
> > > > >>>>>>>>>> needs in the future. What do you think?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Best,
> > > > >>>>>>>>>> Xuannan
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com
> > > > >> <mailto:
> > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Hi Xuannan,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for opening this discussion.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> This current proposal may work in the mentioned watermark
> > > > >> cases.
> > > > >>>>>>>>>>> However, it seems this is not a general solution for
> > sources
> > > > >> to
> > > > >>>>>>>> determine
> > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > >>>>>>>>>>> From my point of view, there are 3 limitations of the
> > current
> > > > >>>>>> proposal:
> > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> > watermark/event-time
> > > > >>>>>> defined,
> > > > >>>>>>>>>>> for example streaming join and unbounded aggregate. We
> may
> > > > >> still
> > > > >>>> need
> > > > >>>>>>>> to
> > > > >>>>>>>>>>> figure out solutions for them.
> > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it increases
> > > > >>>> unlimited
> > > > >>>>>> if
> > > > >>>>>>>> no
> > > > >>>>>>>>>>> data is generated in the Kafka.
> > > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of
> backlog.
> > > > >> If the
> > > > >>>>>>>>>> watermark
> > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > > >>>>>>>>>>> there is possibly only 1 pending record there, which
> means
> > no
> > > > >>>> backlog
> > > > >>>>>>>> at
> > > > >>>>>>>>>>> all.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used
> > to
> > > > >>>>>> determine
> > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > >>>>>>>>>>> What we need is something that reflects the number of
> > records
> > > > >>>>>>>> unprocessed
> > > > >>>>>>>>>>> by the job.
> > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric proposed in
> > > > >>>> FLIP-33 and
> > > > >>>>>>>> has
> > > > >>>>>>>>>>> been implemented by Kafka source.
> > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric to
> determine
> > > > >>>>>>>>>>> "isProcessingBacklog"?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> Jark
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> [1]
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > >>>>>>>>>> <
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > > >>>> tonysong...@gmail.com
> > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Sounds good to me.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> It is true that, if we are introducing the generalized
> > > > >>>> watermark,
> > > > >>>>>>>> there
> > > > >>>>>>>>>>>> will be other watermark related concepts /
> configurations
> > > > >> that
> > > > >>>> need
> > > > >>>>>> to
> > > > >>>>>>>>>> be
> > > > >>>>>>>>>>>> updated anyway.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Xintong
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > > >>>> suxuanna...@gmail.com
> > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Hi Xingtong,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> After considering the idea of using a general
> > configuration
> > > > >>>> key, I
> > > > >>>>>>>>>> think
> > > > >>>>>>>>>>>>> it may not be a good idea for the reasons below.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> While I agree that using a more general configuration
> key
> > > > >>>> provides
> > > > >>>>>> us
> > > > >>>>>>>>>>>> with
> > > > >>>>>>>>>>>>> the flexibility to switch to other approaches to
> > calculate
> > > > >> the
> > > > >>>> lag
> > > > >>>>>> in
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>> future, the downside is that it may cause confusion for
> > > > >> users.
> > > > >>>> We
> > > > >>>>>>>>>>>> currently
> > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> > watermarkLag
> > > > >> in
> > > > >>>> the
> > > > >>>>>>>>>> source,
> > > > >>>>>>>>>>>>> and it is not clear which specific lag we are referring
> > to.
> > > > >>>> With
> > > > >>>>>> the
> > > > >>>>>>>>>>>>> potential introduction of the Generalized Watermark
> > > > >> mechanism
> > > > >>>> in
> > > > >>>>>> the
> > > > >>>>>>>>>>>>> future, if I understand correctly, a watermark won't
> > > > >>>> necessarily
> > > > >>>>>> need
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>> a timestamp. I am concern that the general
> configuration
> > > > >> key
> > > > >>>> may
> > > > >>>>>> not
> > > > >>>>>>>>>> be
> > > > >>>>>>>>>>>>> enough to cover all the use case and we will need to
> > > > >> introduce
> > > > >>>> a
> > > > >>>>>>>>>> general
> > > > >>>>>>>>>>>>> way to determine the backlog status regardless.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> > > > >> configuration
> > > > >>>> as
> > > > >>>>>> is,
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>> change it later with the a deprecation process or
> > migration
> > > > >>>>>> process.
> > > > >>>>>>>>>> What
> > > > >>>>>>>>>>>>> do you think?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>> Xuannan
> > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > > >>>> tonysong...@gmail.com
> > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose this detail
> via
> > > > >> the
> > > > >>>>>>>>>>>>> configuration
> > > > >>>>>>>>>>>>>> option. To be specific, I suggest not mentioning the
> > > > >>>> "watermark"
> > > > >>>>>>>>>>>> keyword
> > > > >>>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>> the configuration key and description.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - From the users' perspective, I think they only need
> to
> > > > >> know
> > > > >>>>>>>> there's
> > > > >>>>>>>>>> a
> > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink will
> consider
> > > > >>>> latency
> > > > >>>>>> of
> > > > >>>>>>>>>>>>>> individual records as less important and prioritize
> > > > >> throughput
> > > > >>>>>> over
> > > > >>>>>>>>>> it.
> > > > >>>>>>>>>>>>>> They don't really need the details of how the lags are
> > > > >>>> calculated.
> > > > >>>>>>>>>>>>>> - For the internal implementation, I also think using
> > > > >>>> watermark
> > > > >>>>>> lags
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already mentioned.
> > > > >>>> However,
> > > > >>>>>> it's
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail from
> users
> > > > >> would
> > > > >>>> give
> > > > >>>>>>>> us
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if needed in
> > > > >> future.
> > > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > > >> ProcessFunction
> > > > >>>> API
> > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an idea
> to
> > > > >>>>>> introduce a
> > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where basically the
> > > > >>>> watermark can
> > > > >>>>>>>> be
> > > > >>>>>>>>>>>>>> anything that needs to travel along the data-flow with
> > > > >> certain
> > > > >>>>>>>>>>>> alignment
> > > > >>>>>>>>>>>>>> strategies, and event time watermark would be one
> > specific
> > > > >>>> case of
> > > > >>>>>>>> it.
> > > > >>>>>>>>>>>>> This
> > > > >>>>>>>>>>>>>> is still an idea and has not been discussed and agreed
> > on
> > > > >> by
> > > > >>>> the
> > > > >>>>>>>>>>>>> community,
> > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are
> going
> > > > >> for
> > > > >>>> it,
> > > > >>>>>> the
> > > > >>>>>>>>>>>>> concept
> > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd also be
> > > > >> fine
> > > > >>>> with
> > > > >>>>>>>>>>>>>> introducing the configuration as is, and changing it
> > > > >> later, if
> > > > >>>>>>>> needed,
> > > > >>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>> a regular deprecation and migration process. Just
> making
> > > > >> my
> > > > >>>>>>>>>>>> suggestions.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Xintong
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > >>>>>> suxuanna...@gmail.com
> > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Hi Xintong,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I have considered using the timestamp in the records
> to
> > > > >>>> determine
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>> backlog status, and decided to use watermark at the
> > end.
> > > > >> By
> > > > >>>>>>>>>>>> definition,
> > > > >>>>>>>>>>>>>>> watermark is the time progress indication in the data
> > > > >>>> stream. It
> > > > >>>>>>>>>>>>> indicates
> > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to some
> specific
> > > > >>>> time. On
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> other
> > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
> > > > >> generate
> > > > >>>> the
> > > > >>>>>>>>>>>>> watermark.
> > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and intuitive
> to
> > > > >>>> calculate
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> event
> > > > >>>>>>>>>>>>>>> time lag by watermark and determine the backlog
> status.
> > > > >> And
> > > > >>>> by
> > > > >>>>>>>> using
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the out-of-order
> and
> > > > >> the
> > > > >>>>>>>> idleness
> > > > >>>>>>>>>>>>> of the
> > > > >>>>>>>>>>>>>>> data.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Please let me know if you have further questions.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>> Xuannan
> > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > >>>>>> tonysong...@gmail.com
> > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> +1 in general.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we are
> relying
> > > > >> on
> > > > >>>> the
> > > > >>>>>>>>>>>>> watermark
> > > > >>>>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use
> timestamps
> > > > >> in the
> > > > >>>>>>>>>>>>> records? I
> > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
> > > > >> wondering if
> > > > >>>>>>>>>>>> there's
> > > > >>>>>>>>>>>>> any
> > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Xintong
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > >>>>>> suxuanna...@gmail.com
> > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow
> > > > >> source
> > > > >>>>>>>>>>>>> operators to
> > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on watermark
> > > > >> lag[1]. We
> > > > >>>>>> had a
> > > > >>>>>>>>>>>>>>> several
> > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the design, and
> > thanks
> > > > >>>> for all
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>> valuable advice.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user
> want
> > to
> > > > >>>> run a
> > > > >>>>>>>>>>>> Flink
> > > > >>>>>>>>>>>>>>> job to
> > > > >>>>>>>>>>>>>>>>> backfill historical data in a high throughput
> manner
> > > > >> and
> > > > >>>>>> continue
> > > > >>>>>>>>>>>>>>>>> processing real-time data with low latency.
> Building
> > > > >> upon
> > > > >>>> the
> > > > >>>>>>>>>>>>> backlog
> > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
> > > > >> enables
> > > > >>>>>> sources
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>> report
> > > > >>>>>>>>>>>>>>>>> their status of processing backlog based on the
> > > > >> watermark
> > > > >>>> lag.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or
> feedback
> > > > >> you
> > > > >>>> may
> > > > >>>>>> have
> > > > >>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>> proposal.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>>>> Xuannan
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> [1]
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > >>>>>>>>>> <
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> [2]
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > >>>>>>>>>> <
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
>

Reply via email to