Hi all,

I have updated FLIP-309 as suggested by Piotr to include a reference to
FLIP-328 in the future work section.

Piotra, Stephan, and I discussed offline regarding the choice
between execution.checkpointing.max-interval and
execution.checkpointing.interval-during-backlog.
The advantage of using "max-interval" is that Flink runtime can have more
flexibility to decide when/how to adjust checkpointing intervals (based on
information other than backlog). The advantage of using
"interval-during-backlog" is that it is clearer to the user when/how this
configured interval is used. Since there is no immediate need for the extra
flexibility as of this FLIP, we agreed to go with interval-during-backlog
for now. And we can rename this config to e.g.
execution.checkpointing.max-interval when needed in the future.

Thanks everyone for all the reviews and suggestions! And special thanks to
Piotr and Stephan for taking extra time to provide detailed reviews and
suggestions offline!

Since there is no further comment, I will open the voting thread for this
FLIP.

Cheers,
Dong


On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi All,
>
> We had a lot of off-line discussions. As a result I would suggest dropping
> the idea of introducing an end-to-end-latency concept, until
> we can properly implement it, which will require more designing and
> experimenting. I would suggest starting with a more manual solution,
> where the user needs to configure concrete parameters, like
> `execution.checkpointing.max-interval` or `execution.flush-interval`.
>
> FLIP-309 looks good to me, I would just rename
> `execution.checkpointing.interval-during-backlog` to
> `execution.checkpointing.max-interval`.
>
> I would also reference future work, that a solution that would allow set
> `isProcessingBacklog` for sources like Kafka will be introduced via
> FLIP-328 [1].
>
> Best,
> Piotrek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>
> śr., 12 lip 2023 o 03:49 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > I think I understand your motivation for suggeseting
> > execution.slow-end-to-end-latency now. Please see my followup comments
> > (after the previous email) inline.
> >
> > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the updates, a couple of comments:
> > >
> > > > If a record is generated by a source when the source's
> > > isProcessingBacklog is true, or some of the records used to
> > > > derive this record (by an operator) has isBacklog = true, then this
> > > record should have isBacklog = true. Otherwise,
> > > > this record should have isBacklog = false.
> > >
> > > nit:
> > > I think this conflicts with "Rule of thumb for non-source operators to
> > set
> > > isBacklog = true for the records it emits:"
> > > section later on, when it comes to a case if an operator has mixed
> > > isBacklog = false and isBacklog = true inputs.
> > >
> > > > execution.checkpointing.interval-during-backlog
> > >
> > > Do we need to define this as an interval config parameter? Won't that
> add
> > > an option that will be almost instantly deprecated
> > > because what we actually would like to have is:
> > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > >
> >
> > I guess you are suggesting that we should allow users to specify a higher
> > end-to-end latency budget for those records that are emitted by two-phase
> > commit sink, than those records that are emitted by none-two-phase commit
> > sink.
> >
> > My concern with this approach is that it will increase the complexity of
> > the definition of "processing latency requirement", as well as the
> > complexity of the Flink runtime code that handles it. Currently, the
> > FLIP-325 defines end-to-end latency as an attribute of the records that
> is
> > statically assigned when the record is generated at the source,
> regardless
> > of how it will be emitted later in the topology. If we make the changes
> > proposed above, we would need to define the latency requirement w.r.t.
> the
> > attribute of the operators that it travels through before its result is
> > emitted, which is less intuitive and more complex.
> >
> > For now, it is not clear whether it is necessary to have two categories
> of
> > latency requirement for the same job. Maybe it is reasonable to assume
> that
> > if a job has two-phase commit sink and the user is OK to emit some
> results
> > at 1 minute interval, then more likely than not the user is also OK to
> emit
> > all results at 1 minute interval, include those that go through
> > none-two-phase commit sink?
> >
> > If we do want to support different end-to-end latency depending on
> whether
> > the operator is emitted by two-phase commit sink, I would prefer to still
> > use execution.checkpointing.interval-during-backlog instead of
> > execution.slow-end-to-end-latency. This allows us to keep the concept of
> > end-to-end latency simple. Also, by explicitly including "checkpointing
> > interval" in the name of the config that directly affects checkpointing
> > interval, we can make it easier and more intuitive for users to
> understand
> > the impact and set proper value for such configs.
> >
> > What do you think?
> >
> > Best,
> > Dong
> >
> >
> > > Maybe we can introduce only `execution.slow-end-to-end-latency` (%
> maybe
> > a
> > > better name), and for the time being
> > > use it as the checkpoint interval value during backlog?
> >
> >
> > > Or do you envision that in the future users will be configuring only:
> > > - execution.end-to-end-latency
> > > and only optionally:
> > > - execution.checkpointing.interval-during-backlog
> > > ?
> > >
> > > Best Piotrek
> > >
> > > PS, I will read the summary that you have just published later, but I
> > think
> > > we don't need to block this FLIP on the
> > > existence of that high level summary.
> > >
> > > wt., 11 lip 2023 o 17:49 Dong Lin <lindon...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr and everyone,
> > > >
> > > > I have documented the vision with a summary of the existing work in
> > this
> > > > doc. Please feel free to review/comment/edit this doc. Looking
> forward
> > to
> > > > working with you together in this line of work.
> > > >
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski <
> > piotr.nowoj...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Me and Dong chatted offline about the above mentioned issues
> (thanks
> > > for
> > > > > that offline chat
> > > > > I think it helped both of us a lot). The summary is below.
> > > > >
> > > > > > Previously, I thought you meant to add a generic logic in
> > > > > SourceReaderBase
> > > > > > to read existing metrics (e.g. backpressure) and emit the
> > > > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I
> have
> > > > > > misunderstood your suggetions.
> > > > > >
> > > > > > After double-checking your previous suggestion, I am wondering if
> > you
> > > > are
> > > > > > OK with the following approach:
> > > > > >
> > > > > > - Add a job-level config
> > > > execution.checkpointing.interval-during-backlog
> > > > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > > > isProcessingBacklog).
> > > > > > - When this API is invoked, it internally sends an
> > > > > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > > > > - SourceCoordinator should keep track of the latest
> > > isProcessingBacklog
> > > > > > status from all its subtasks. And for now, we will hardcode the
> > logic
> > > > > such
> > > > > > that if any source reader says it is under backlog, then
> > > > > > execution.checkpointing.interval-during-backlog is used.
> > > > > >
> > > > > > This approach looks good to me as it can achieve the same
> > performance
> > > > > with
> > > > > > the same number of public APIs for the target use-case. And I
> > suppose
> > > > in
> > > > > > the future we might be able to re-use this API for source reader
> to
> > > set
> > > > > its
> > > > > > backlog status based on its backpressure metrics, which could be
> an
> > > > extra
> > > > > > advantage over the current approach.
> > > > > >
> > > > > > Do you think we can agree to adopt the approach described above?
> > > > >
> > > > > Yes, I think that's a viable approach. I would be perfectly fine to
> > not
> > > > > introduce
> > > > > `SourceReaderContext#setProcessingBacklog(boolean
> > > isProcessingBacklog).`
> > > > > and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> > > > > in this FLIP. It could be implemented once we would decide to add
> > some
> > > > more
> > > > > generic
> > > > > ways of detecting backlog/backpressure on the SourceReader level.
> > > > >
> > > > > I think we could also just keep the current proposal of adding
> > > > > `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the
> > > > sources
> > > > > that
> > > > > can set it on the `SplitEnumerator` level. Later we could merge
> this
> > > with
> > > > > another
> > > > > mechanisms of detecting "isProcessingBacklog", like based on
> > watermark
> > > > lag,
> > > > > backpressure, etc, via some component running on the JM.
> > > > >
> > > > > At the same time I'm fine with having the "isProcessingBacklog"
> > concept
> > > > to
> > > > > switch
> > > > > runtime back and forth between high and low latency modes instead
> of
> > > > > "backpressure". In FLIP-325 I have asked:
> > > > >
> > > > > > I think there is one thing that hasn't been discussed neither
> here
> > > nor
> > > > in
> > > > > FLIP-309. Given that we have
> > > > > > three dimensions:
> > > > > > - e2e latency/checkpointing interval
> > > > > > - enabling some kind of batching/buffering on the operator level
> > > > > > - how much resources we want to allocate to the job
> > > > > >
> > > > > > How do we want Flink to adjust itself between those three? For
> > > example:
> > > > > > a) Should we assume that given Job has a fixed amount of assigned
> > > > > resources and make it paramount that
> > > > > >   Flink doesn't exceed those available resources? So in case of
> > > > > backpressure, we
> > > > > >   should extend checkpointing intervals, emit records less
> > frequently
> > > > and
> > > > > in batches.
> > > > > > b) Or should we assume that the amount of resources is flexible
> (up
> > > to
> > > > a
> > > > > point?), and the desired e2e latency
> > > > > >   is the paramount aspect? So in case of backpressure, we should
> > > still
> > > > > adhere to the configured e2e latency,
> > > > > >   and wait for the user or autoscaler to scale up the job?
> > > > > >
> > > > > > In case of a), I think the concept of "isProcessingBacklog" is
> not
> > > > > needed, we could steer the behaviour only
> > > > > > using the backpressure information.
> > > > > >
> > > > > > On the other hand, in case of b), "isProcessingBacklog"
> information
> > > > might
> > > > > be helpful, to let Flink know that
> > > > > > we can safely decrease the e2e latency/checkpoint interval even
> if
> > > > there
> > > > > is no backpressure, to use fewer
> > > > > > resources (and let the autoscaler scale down the job).
> > > > > >
> > > > > > Do we want to have both, or only one of those? Do a) and b)
> > > complement
> > > > > one another? If job is backpressured,
> > > > > > we should follow a) and expose to autoscaler/users information
> > "Hey!
> > > > I'm
> > > > > barely keeping up! I need more resources!".
> > > > > > While, when there is no backpressure and latency doesn't matter
> > > > > (isProcessingBacklog=true), we can limit the resource
> > > > > > usage
> > > > >
> > > > > After thinking this over:
> > > > > - the case that we don't have "isProcessingBacklog" information,
> but
> > > the
> > > > > source operator is
> > > > >   back pressured, must be intermittent. EIther back pressure will
> go
> > > > away,
> > > > > or shortly we should
> > > > >   reach the "isProcessingBacklog" state anyway
> > > > > - and even if we implement some back pressure detecting algorithm
> to
> > > > switch
> > > > > the runtime into the
> > > > >   "high latency mode", we can always report that as
> > > "isProcessingBacklog"
> > > > > anyway, as runtime should
> > > > >    react the same way in both cases (backpressure and
> > > > "isProcessingBacklog
> > > > > states).
> > > > >
> > > > > ===============
> > > > >
> > > > > With a common understanding of the final solution that we want to
> > have
> > > in
> > > > > the future, I'm pretty much fine with the current
> > > > > FLIP-309 proposal, with a couple of remarks:
> > > > > 1. Could you include in the FLIP-309 the long term solution as we
> > have
> > > > > discussed.
> > > > >         a) Would be nice to have some diagram showing how the
> > > > > "isProcessingBacklog" information would be travelling,
> > > > >              being aggregated and what will be done with that
> > > > information.
> > > > > (from SourceReader/SplitEnumerator to some
> > > > >             "component" aggregating it, and then ... ?)
> > > > > 2. For me "processing backlog" doesn't necessarily equate to
> > > > "backpressure"
> > > > > (HybridSource can be
> > > > >     both NOT backpressured and processing backlog at the same
> time).
> > If
> > > > you
> > > > > think the same way, can you include that
> > > > >     definition of "processing backlog" in the FLIP including its
> > > relation
> > > > > to the backpressure state? If not, we need to align
> > > > >     on that definition first :)
> > > > >
> > > > > Also I'm missing a big picture description, that would show what
> are
> > > you
> > > > > trying to achieve and what's the overarching vision
> > > > > behind all of the current and future FLIPs that you are planning in
> > > this
> > > > > area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
> > > > > Or was it described somewhere and I've missed it?
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > >
> > > > >
> > > > > czw., 6 lip 2023 o 06:25 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > > >
> > > > > > Hi Piotr,
> > > > > >
> > > > > > I am sorry if you feel unhappy or upset with us for not
> > > > following/fixing
> > > > > > your proposal. It is not my intention to give you this feeling.
> > After
> > > > > all,
> > > > > > we are all trying to make Flink better, to support more use-case
> > with
> > > > the
> > > > > > most maintainable code. I hope you understand that just like
> you, I
> > > > have
> > > > > > also been doing my best to think through various design options
> and
> > > > > taking
> > > > > > time to evalute the pros/cons. Eventually, we probably still need
> > to
> > > > > reach
> > > > > > consensus by clearly listing and comparing the objective
> pros/cons
> > of
> > > > > > different proposals and identifying the best choice.
> > > > > >
> > > > > > Regarding your concern (or frustration) that we are always
> finding
> > > > issues
> > > > > > in your proposal, I would say it is normal (and probably
> necessary)
> > > for
> > > > > > developers to find pros/cons in each other's solutions, so that
> we
> > > can
> > > > > > eventually pick the right one. I will appreciate anyone who can
> > > > correctly
> > > > > > pinpoint the concrete issue in my proposal so that I can improve
> it
> > > or
> > > > > > choose an alternative solution.
> > > > > >
> > > > > > Regarding your concern that we are not spending enough effort to
> > find
> > > > > > solutions and that the problem in your solution can be solved in
> a
> > > > > minute,
> > > > > > I would like to say that is not true. For each of your previous
> > > > > proposals,
> > > > > > I typically spent 1+ hours thinking through your proposal to
> > > understand
> > > > > > whether it works and why it does not work, and another 1+ hour to
> > > write
> > > > > > down the details and explain why it does not work. And I have
> had a
> > > > > variety
> > > > > > of offline discussions with my colleagues discussing various
> > > proposals
> > > > > > (including yours) with 6+ hours in total. Maybe I am not capable
> > > enough
> > > > > to
> > > > > > fix those issues in one minute or so so. If you think your
> proposal
> > > can
> > > > > be
> > > > > > easily fixed in one minute or so, I would really appreciate it if
> > you
> > > > can
> > > > > > think through your proposal and fix it in the first place :)
> > > > > >
> > > > > > For your information, I have had several long discussions with my
> > > > > > colleagues at Alibaba and also Becket on this FLIP. We have
> > seriously
> > > > > > considered your proposals and discussed in detail what are the
> > > > pros/cons
> > > > > > and whether we can improve these solutions. The initial version
> of
> > > this
> > > > > > FLIP (which allows the source operator to specify checkpoint
> > > intervals)
> > > > > > does not get enough support due to concerns of not being generic
> > > (i.e.
> > > > > > users need to specify checkpoint intervals on a per-source
> basis).
> > It
> > > > is
> > > > > > only after I updated the FLIP to use the job-level
> > > > > > execution.checkpointing.interval-during-backlog, then they agree
> to
> > > > give
> > > > > +1
> > > > > > to the FLIP. What I want to tell you is that your suggestions
> have
> > > been
> > > > > > taken seriously, and the quality of the FLIP has been taken
> > seriously
> > > > > > by all those who have voted. As a result of taking your
> suggestion
> > > > > > seriously and trying to find improvements, we updated the FLIP to
> > use
> > > > > > isProcessingBacklog.
> > > > > >
> > > > > > I am wondering, do you think it will be useful to discuss
> > > face-to-face
> > > > > via
> > > > > > video conference call? It is not just between you and me. We can
> > > invite
> > > > > the
> > > > > > developers who are interested to join and help with the
> discussion.
> > > > That
> > > > > > might improve communication efficiency and help us understand
> each
> > > > other
> > > > > > better :)
> > > > > >
> > > > > > I am writing this long email to hopefully get your
> understanding. I
> > > > care
> > > > > > much more about the quality of the eventual solution rather than
> > who
> > > > > > proposed the solution. Please bear with me and see my comments
> > > inline,
> > > > > with
> > > > > > an explanation of the pros/cons of these proposals.
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guys,
> > > > > > >
> > > > > > > I would like to ask you again, to spend a bit more effort on
> > trying
> > > > to
> > > > > > find
> > > > > > > solutions, not just pointing out problems. For 1.5 months,
> > > > > > > the discussion doesn't go in circle, but I'm suggesting a
> > solution,
> > > > you
> > > > > > are
> > > > > > > trying to undermine it with some arguments, I'm coming
> > > > > > > back with a fix, often an extremely easy one, only for you to
> try
> > > to
> > > > > find
> > > > > > > yet another "issue". It doesn't bode well, if you are finding
> > > > > > > a "problem" that can be solved with a minute or so of thinking
> or
> > > > even
> > > > > > has
> > > > > > > already been solved.
> > > > > > >
> > > > > > > I have provided you so far with at least three distinct
> solutions
> > > > that
> > > > > > > could address your exact target use-case. Two [1][2] generic
> > > > > > > enough to be probably good enough for the foreseeable future,
> one
> > > > > > > intermediate and not generic [3] but which wouldn't
> > > > > > > require @Public API changes or some custom hidden interfaces.
> > > > > >
> > > > > >
> > > > > > > All in all:
> > > > > > > - [1] with added metric hints like "isProcessingBacklog" solves
> > > your
> > > > > > target
> > > > > > > use case pretty well. Downside is having to improve
> > > > > > >   how JM is collecting/aggregating metrics
> > > > > > >
> > > > > >
> > > > > > Here is my analysis of this proposal compared to the current
> > approach
> > > > in
> > > > > > the FLIP-309.
> > > > > >
> > > > > > pros:
> > > > > > - No need to add the public API
> > > > > > SplitEnumeratorContext#setIsProcessingBacklog.
> > > > > > cons:
> > > > > > - Need to add a public API that subclasses of SourceReader can
> use
> > to
> > > > > > specify its IsProcessingBacklog metric value.
> > > > > > - Source Coordinator needs to periodically pull the
> > > isProcessingBacklog
> > > > > > metrics from all TMs throughout the job execution.
> > > > > >
> > > > > > Here is why I think the cons outweigh the pros:
> > > > > > 1) JM needs to collect/aggregate metrics with extra runtime
> > overhead,
> > > > > which
> > > > > > is not necessary for the target use-case with the push-based
> > approach
> > > > in
> > > > > > FLIP-309.
> > > > > > 2) For the target use-case, it is simpler and more intuitive for
> > > source
> > > > > > operators (e.g. HybridSource, MySQL CDC source) to be able to set
> > its
> > > > > > isProcessingBacklog status in the SplitEnumerator. This is
> because
> > > the
> > > > > > switch between bounded/unbounded stages happens in their
> > > > SplitEnumerator.
> > > > > >
> > > > > >
> > > > > >
> > > > > > > - [2] is basically an equivalent of [1], replacing metrics with
> > > > events.
> > > > > > It
> > > > > > > also is a superset of your proposal
> > > > > > >
> > > > > >
> > > > > > Previously, I thought you meant to add a generic logic in
> > > > > SourceReaderBase
> > > > > > to read existing metrics (e.g. backpressure) and emit the
> > > > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I
> have
> > > > > > misunderstood your suggetions.
> > > > > >
> > > > > > After double-checking your previous suggestion, I am wondering if
> > you
> > > > are
> > > > > > OK with the following approach:
> > > > > >
> > > > > > - Add a job-level config
> > > > execution.checkpointing.interval-during-backlog
> > > > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > > > isProcessingBacklog).
> > > > > > - When this API is invoked, it internally sends an
> > > > > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > > > > - SourceCoordinator should keep track of the latest
> > > isProcessingBacklog
> > > > > > status from all its subtasks. And for now, we will hardcode the
> > logic
> > > > > such
> > > > > > that if any source reader says it is under backlog, then
> > > > > > execution.checkpointing.interval-during-backlog is used.
> > > > > >
> > > > > > This approach looks good to me as it can achieve the same
> > performance
> > > > > with
> > > > > > the same number of public APIs for the target use-case. And I
> > suppose
> > > > in
> > > > > > the future we might be able to re-use this API for source reader
> to
> > > set
> > > > > its
> > > > > > backlog status based on its backpressure metrics, which could be
> an
> > > > extra
> > > > > > advantage over the current approach.
> > > > > >
> > > > > > Do you think we can agree to adopt the approach described above?
> > > > > >
> > > > > >
> > > > > > - [3] yes, it's hacky, but it's a solution that could be thrown
> > away
> > > > once
> > > > > > > we implement [1] or [2] . The only real theoretical
> > > > > > >   downside is that it cannot control the long checkpoint
> exactly
> > > > (short
> > > > > > > checkpoint interval has to be a divisor of the long checkpoint
> > > > > > >   interval, but I simply can not imagine a practical use where
> > that
> > > > > would
> > > > > > > be a blocker for a user. Please..., someone wanting to set
> > > > > > >   short checkpoint interval to 3min and long to 7 minutes, and
> > that
> > > > > > someone
> > > > > > > can not accept the long interval to be 9 minutes?
> > > > > > >   And that's even ignoring the fact that if someone has an
> issue
> > > with
> > > > > > the 3
> > > > > > > minutes checkpoint interval, I can hardly think that merely
> > > > > > >   doubling the interval to 7 minutes would significantly solve
> > any
> > > > > > problem
> > > > > > > for that user.
> > > > > > >
> > > > > >
> > > > > > Yes, this is a fabricated example that shows
> > > > > > execution.checkpointing.interval-during-backlog might not be
> > > accurately
> > > > > > enforced with this option. I think you are probably right that it
> > > might
> > > > > not
> > > > > > matter that much. I just think we should try our best to make
> Flink
> > > > > public
> > > > > > API's semantics (including configuration) clear, simple, and
> > > > enforceable.
> > > > > > If we can make the user-facing configuration enforceable at the
> > cost
> > > of
> > > > > an
> > > > > > extra developer facing API (i.e. setProcessingBacklog(...)), I
> > would
> > > > > prefer
> > > > > > to do this.
> > > > > >
> > > > > > It seems that we both agree that option [2] is better than [3]. I
> > > will
> > > > > skip
> > > > > > the further comments for this option and we can probably focus on
> > > > > > option [2] :)
> > > > > >
> > > > > >
> > > > > > > Dong a long time ago you wrote:
> > > > > > > > Sure. Then let's decide the final solution first.
> > > > > > >
> > > > > > > Have you thought about that? Maybe I'm wrong but I don't
> remember
> > > you
> > > > > > > describing in any of your proposals how they could be
> > > > > > > extended in the future, to cover more generic cases. Regardless
> > if
> > > > you
> > > > > > > either don't believe in the generic solution or struggle to
> > > > > > >
> > > > > >
> > > > > > Yes, I have thought about the plan to extend the current FLIP to
> > > > support
> > > > > > metrics (e.g. backpressure) based solution you described earlier.
> > > > > Actually,
> > > > > > I mentioned multiple times in the earlier email that your
> > suggestion
> > > of
> > > > > > using metrics is valuable and I will do this in a follow-up FLIP.
> > > > > >
> > > > > > Here are my comments from the previous email:
> > > > > > - See "I will add follow-up FLIPs to make use of the event-time
> > > metrics
> > > > > and
> > > > > > backpressure metrics" from Jul 3, 2023, 6:39 PM
> > > > > > - See "I agree it is valuable" from Jul 1, 2023, 11:00 PM
> > > > > > - See "we will create a followup FLIP (probably in FLIP-328)"
> from
> > > Jun
> > > > > 29,
> > > > > > 2023, 11:01 AM
> > > > > >
> > > > > > Frankly speaking, I think the idea around using the backpressure
> > > > metrics
> > > > > > still needs a bit more thinking before we can propose a FLIP.
> But I
> > > am
> > > > > > pretty sure we can make use of the watermark/event-time to
> > determine
> > > > the
> > > > > > backlog status.
> > > > > >
> > > > > > grasp it, if you can come back with something that can be easily
> > > > extended
> > > > > > > in the future, up to a point where one could implement
> > > > > > > something similar to this backpressure detecting algorithm
> that I
> > > > > > mentioned
> > > > > > > many times before, I would be happy to discuss and
> > > > > > > support it.
> > > > > > >
> > > > > >
> > > > > > Here is my idea of extending the source reader to support
> > > > > event-time-based
> > > > > > backlog detecting algorithms:
> > > > > >
> > > > > > - Add a job-level config such as
> > watermark-lag-threshold-for-backlog.
> > > > If
> > > > > > any source reader determines that the event-timestamp is
> available
> > > and
> > > > > the
> > > > > > system-time - watermark exceeds this threshold, then the source
> > > reader
> > > > > > considers its isProcessingBacklog=true.
> > > > > > - The source reader can send an event to the source coordinator.
> > Note
> > > > > that
> > > > > > this might be doable in the SourceReaderBase without adding any
> > > public
> > > > > API
> > > > > > which the concrete SourceReader subclass needs to explicitly
> > invoke.
> > > > > > - And in the future if FLIP-325 is accepted, insteading of
> sending
> > > the
> > > > > > event to SourceCoordinator and let SourceCoordinator inform the
> > > > > checkpoint
> > > > > > coordinator, the source reader might just emit the information as
> > > part
> > > > of
> > > > > > the RecordAttributes and let the two-phase commit sink inform the
> > > > > > checkpoint coordinator.
> > > > > >
> > > > > > Note that this is a sketch of the idea and it might need further
> > > > > > improvement. I just hope you understand that we have thought
> about
> > > this
> > > > > > idea and did quite a lot of thinking for these design options. If
> > it
> > > is
> > > > > OK
> > > > > > with you, I hope we can make incremental progress and discuss the
> > > > > > metrics-based solution separately in a follow-up FLIP.
> > > > > >
> > > > > > Last but not least, thanks for taking so much time to leave
> > comments
> > > > and
> > > > > > help us improve the FLIP. Please kindly bear with us in this
> > > > discussion.
> > > > > I
> > > > > > am looking forward to collaborating with you to find the best
> > design
> > > > for
> > > > > > the target use-cases.
> > > > > >
> > > > > > Best,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > > Hang, about your points 1. and 2., do you think those problems
> > are
> > > > > > > insurmountable and blockers for that counter proposal?
> > > > > > >
> > > > > > > > 1. It is hard to find the error checkpoint.
> > > > > > >
> > > > > > > No it's not, please take a look at what I exactly proposed and
> > > maybe
> > > > at
> > > > > > the
> > > > > > > code.
> > > > > > >
> > > > > > > > 2. (...) The failed checkpoint may make them think the job is
> > > > > > unhealthy.
> > > > > > >
> > > > > > > Please read again what I wrote in [3]. I'm mentioning there a
> > > > solution
> > > > > > for
> > > > > > > this exact "problem".
> > > > > > >
> > > > > > > About the necessity of the config value, I'm still not
> convinced
> > > > that's
> > > > > > > needed from the start, but yes we can add some config option
> > > > > > > if you think otherwise. This option, if named properly, could
> be
> > > > > re-used
> > > > > > in
> > > > > > > the future for different solutions, so that's fine by me.
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > > [1] Introduced in my very first e-mail from 23 maj 2023, 16:26,
> > and
> > > > > > refined
> > > > > > > later with point "2." in my e-mail from 16 June 2023, 17:58
> > > > > > > [2] Section "2. ===============" in my e-mail from 30 June
> 2023,
> > > > 16:34
> > > > > > > [3] Section "3. ===============" in my e-mail from 30 June
> 2023,
> > > > 16:34
> > > > > > >
> > > > > > > All times in CEST.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to