Thanks Dong!

Piotrek

wt., 18 lip 2023 o 06:04 Dong Lin <lindon...@gmail.com> napisał(a):

> Hi all,
>
> I have updated FLIP-309 as suggested by Piotr to include a reference to
> FLIP-328 in the future work section.
>
> Piotra, Stephan, and I discussed offline regarding the choice
> between execution.checkpointing.max-interval and
> execution.checkpointing.interval-during-backlog.
> The advantage of using "max-interval" is that Flink runtime can have more
> flexibility to decide when/how to adjust checkpointing intervals (based on
> information other than backlog). The advantage of using
> "interval-during-backlog" is that it is clearer to the user when/how this
> configured interval is used. Since there is no immediate need for the extra
> flexibility as of this FLIP, we agreed to go with interval-during-backlog
> for now. And we can rename this config to e.g.
> execution.checkpointing.max-interval when needed in the future.
>
> Thanks everyone for all the reviews and suggestions! And special thanks to
> Piotr and Stephan for taking extra time to provide detailed reviews and
> suggestions offline!
>
> Since there is no further comment, I will open the voting thread for this
> FLIP.
>
> Cheers,
> Dong
>
>
> On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > We had a lot of off-line discussions. As a result I would suggest
> dropping
> > the idea of introducing an end-to-end-latency concept, until
> > we can properly implement it, which will require more designing and
> > experimenting. I would suggest starting with a more manual solution,
> > where the user needs to configure concrete parameters, like
> > `execution.checkpointing.max-interval` or `execution.flush-interval`.
> >
> > FLIP-309 looks good to me, I would just rename
> > `execution.checkpointing.interval-during-backlog` to
> > `execution.checkpointing.max-interval`.
> >
> > I would also reference future work, that a solution that would allow set
> > `isProcessingBacklog` for sources like Kafka will be introduced via
> > FLIP-328 [1].
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >
> > śr., 12 lip 2023 o 03:49 Dong Lin <lindon...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > I think I understand your motivation for suggeseting
> > > execution.slow-end-to-end-latency now. Please see my followup comments
> > > (after the previous email) inline.
> > >
> > > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski <pnowoj...@apache.org>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the updates, a couple of comments:
> > > >
> > > > > If a record is generated by a source when the source's
> > > > isProcessingBacklog is true, or some of the records used to
> > > > > derive this record (by an operator) has isBacklog = true, then this
> > > > record should have isBacklog = true. Otherwise,
> > > > > this record should have isBacklog = false.
> > > >
> > > > nit:
> > > > I think this conflicts with "Rule of thumb for non-source operators
> to
> > > set
> > > > isBacklog = true for the records it emits:"
> > > > section later on, when it comes to a case if an operator has mixed
> > > > isBacklog = false and isBacklog = true inputs.
> > > >
> > > > > execution.checkpointing.interval-during-backlog
> > > >
> > > > Do we need to define this as an interval config parameter? Won't that
> > add
> > > > an option that will be almost instantly deprecated
> > > > because what we actually would like to have is:
> > > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > > >
> > >
> > > I guess you are suggesting that we should allow users to specify a
> higher
> > > end-to-end latency budget for those records that are emitted by
> two-phase
> > > commit sink, than those records that are emitted by none-two-phase
> commit
> > > sink.
> > >
> > > My concern with this approach is that it will increase the complexity
> of
> > > the definition of "processing latency requirement", as well as the
> > > complexity of the Flink runtime code that handles it. Currently, the
> > > FLIP-325 defines end-to-end latency as an attribute of the records that
> > is
> > > statically assigned when the record is generated at the source,
> > regardless
> > > of how it will be emitted later in the topology. If we make the changes
> > > proposed above, we would need to define the latency requirement w.r.t.
> > the
> > > attribute of the operators that it travels through before its result is
> > > emitted, which is less intuitive and more complex.
> > >
> > > For now, it is not clear whether it is necessary to have two categories
> > of
> > > latency requirement for the same job. Maybe it is reasonable to assume
> > that
> > > if a job has two-phase commit sink and the user is OK to emit some
> > results
> > > at 1 minute interval, then more likely than not the user is also OK to
> > emit
> > > all results at 1 minute interval, include those that go through
> > > none-two-phase commit sink?
> > >
> > > If we do want to support different end-to-end latency depending on
> > whether
> > > the operator is emitted by two-phase commit sink, I would prefer to
> still
> > > use execution.checkpointing.interval-during-backlog instead of
> > > execution.slow-end-to-end-latency. This allows us to keep the concept
> of
> > > end-to-end latency simple. Also, by explicitly including "checkpointing
> > > interval" in the name of the config that directly affects checkpointing
> > > interval, we can make it easier and more intuitive for users to
> > understand
> > > the impact and set proper value for such configs.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > > Maybe we can introduce only `execution.slow-end-to-end-latency` (%
> > maybe
> > > a
> > > > better name), and for the time being
> > > > use it as the checkpoint interval value during backlog?
> > >
> > >
> > > > Or do you envision that in the future users will be configuring only:
> > > > - execution.end-to-end-latency
> > > > and only optionally:
> > > > - execution.checkpointing.interval-during-backlog
> > > > ?
> > > >
> > > > Best Piotrek
> > > >
> > > > PS, I will read the summary that you have just published later, but I
> > > think
> > > > we don't need to block this FLIP on the
> > > > existence of that high level summary.
> > > >
> > > > wt., 11 lip 2023 o 17:49 Dong Lin <lindon...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr and everyone,
> > > > >
> > > > > I have documented the vision with a summary of the existing work in
> > > this
> > > > > doc. Please feel free to review/comment/edit this doc. Looking
> > forward
> > > to
> > > > > working with you together in this line of work.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Me and Dong chatted offline about the above mentioned issues
> > (thanks
> > > > for
> > > > > > that offline chat
> > > > > > I think it helped both of us a lot). The summary is below.
> > > > > >
> > > > > > > Previously, I thought you meant to add a generic logic in
> > > > > > SourceReaderBase
> > > > > > > to read existing metrics (e.g. backpressure) and emit the
> > > > > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I
> > have
> > > > > > > misunderstood your suggetions.
> > > > > > >
> > > > > > > After double-checking your previous suggestion, I am wondering
> if
> > > you
> > > > > are
> > > > > > > OK with the following approach:
> > > > > > >
> > > > > > > - Add a job-level config
> > > > > execution.checkpointing.interval-during-backlog
> > > > > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > > > > isProcessingBacklog).
> > > > > > > - When this API is invoked, it internally sends an
> > > > > > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > > > > > - SourceCoordinator should keep track of the latest
> > > > isProcessingBacklog
> > > > > > > status from all its subtasks. And for now, we will hardcode the
> > > logic
> > > > > > such
> > > > > > > that if any source reader says it is under backlog, then
> > > > > > > execution.checkpointing.interval-during-backlog is used.
> > > > > > >
> > > > > > > This approach looks good to me as it can achieve the same
> > > performance
> > > > > > with
> > > > > > > the same number of public APIs for the target use-case. And I
> > > suppose
> > > > > in
> > > > > > > the future we might be able to re-use this API for source
> reader
> > to
> > > > set
> > > > > > its
> > > > > > > backlog status based on its backpressure metrics, which could
> be
> > an
> > > > > extra
> > > > > > > advantage over the current approach.
> > > > > > >
> > > > > > > Do you think we can agree to adopt the approach described
> above?
> > > > > >
> > > > > > Yes, I think that's a viable approach. I would be perfectly fine
> to
> > > not
> > > > > > introduce
> > > > > > `SourceReaderContext#setProcessingBacklog(boolean
> > > > isProcessingBacklog).`
> > > > > > and sending the `SourceReaderBacklogEvent` from SourceReader to
> JM
> > > > > > in this FLIP. It could be implemented once we would decide to add
> > > some
> > > > > more
> > > > > > generic
> > > > > > ways of detecting backlog/backpressure on the SourceReader level.
> > > > > >
> > > > > > I think we could also just keep the current proposal of adding
> > > > > > `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in
> the
> > > > > sources
> > > > > > that
> > > > > > can set it on the `SplitEnumerator` level. Later we could merge
> > this
> > > > with
> > > > > > another
> > > > > > mechanisms of detecting "isProcessingBacklog", like based on
> > > watermark
> > > > > lag,
> > > > > > backpressure, etc, via some component running on the JM.
> > > > > >
> > > > > > At the same time I'm fine with having the "isProcessingBacklog"
> > > concept
> > > > > to
> > > > > > switch
> > > > > > runtime back and forth between high and low latency modes instead
> > of
> > > > > > "backpressure". In FLIP-325 I have asked:
> > > > > >
> > > > > > > I think there is one thing that hasn't been discussed neither
> > here
> > > > nor
> > > > > in
> > > > > > FLIP-309. Given that we have
> > > > > > > three dimensions:
> > > > > > > - e2e latency/checkpointing interval
> > > > > > > - enabling some kind of batching/buffering on the operator
> level
> > > > > > > - how much resources we want to allocate to the job
> > > > > > >
> > > > > > > How do we want Flink to adjust itself between those three? For
> > > > example:
> > > > > > > a) Should we assume that given Job has a fixed amount of
> assigned
> > > > > > resources and make it paramount that
> > > > > > >   Flink doesn't exceed those available resources? So in case of
> > > > > > backpressure, we
> > > > > > >   should extend checkpointing intervals, emit records less
> > > frequently
> > > > > and
> > > > > > in batches.
> > > > > > > b) Or should we assume that the amount of resources is flexible
> > (up
> > > > to
> > > > > a
> > > > > > point?), and the desired e2e latency
> > > > > > >   is the paramount aspect? So in case of backpressure, we
> should
> > > > still
> > > > > > adhere to the configured e2e latency,
> > > > > > >   and wait for the user or autoscaler to scale up the job?
> > > > > > >
> > > > > > > In case of a), I think the concept of "isProcessingBacklog" is
> > not
> > > > > > needed, we could steer the behaviour only
> > > > > > > using the backpressure information.
> > > > > > >
> > > > > > > On the other hand, in case of b), "isProcessingBacklog"
> > information
> > > > > might
> > > > > > be helpful, to let Flink know that
> > > > > > > we can safely decrease the e2e latency/checkpoint interval even
> > if
> > > > > there
> > > > > > is no backpressure, to use fewer
> > > > > > > resources (and let the autoscaler scale down the job).
> > > > > > >
> > > > > > > Do we want to have both, or only one of those? Do a) and b)
> > > > complement
> > > > > > one another? If job is backpressured,
> > > > > > > we should follow a) and expose to autoscaler/users information
> > > "Hey!
> > > > > I'm
> > > > > > barely keeping up! I need more resources!".
> > > > > > > While, when there is no backpressure and latency doesn't matter
> > > > > > (isProcessingBacklog=true), we can limit the resource
> > > > > > > usage
> > > > > >
> > > > > > After thinking this over:
> > > > > > - the case that we don't have "isProcessingBacklog" information,
> > but
> > > > the
> > > > > > source operator is
> > > > > >   back pressured, must be intermittent. EIther back pressure will
> > go
> > > > > away,
> > > > > > or shortly we should
> > > > > >   reach the "isProcessingBacklog" state anyway
> > > > > > - and even if we implement some back pressure detecting algorithm
> > to
> > > > > switch
> > > > > > the runtime into the
> > > > > >   "high latency mode", we can always report that as
> > > > "isProcessingBacklog"
> > > > > > anyway, as runtime should
> > > > > >    react the same way in both cases (backpressure and
> > > > > "isProcessingBacklog
> > > > > > states).
> > > > > >
> > > > > > ===============
> > > > > >
> > > > > > With a common understanding of the final solution that we want to
> > > have
> > > > in
> > > > > > the future, I'm pretty much fine with the current
> > > > > > FLIP-309 proposal, with a couple of remarks:
> > > > > > 1. Could you include in the FLIP-309 the long term solution as we
> > > have
> > > > > > discussed.
> > > > > >         a) Would be nice to have some diagram showing how the
> > > > > > "isProcessingBacklog" information would be travelling,
> > > > > >              being aggregated and what will be done with that
> > > > > information.
> > > > > > (from SourceReader/SplitEnumerator to some
> > > > > >             "component" aggregating it, and then ... ?)
> > > > > > 2. For me "processing backlog" doesn't necessarily equate to
> > > > > "backpressure"
> > > > > > (HybridSource can be
> > > > > >     both NOT backpressured and processing backlog at the same
> > time).
> > > If
> > > > > you
> > > > > > think the same way, can you include that
> > > > > >     definition of "processing backlog" in the FLIP including its
> > > > relation
> > > > > > to the backpressure state? If not, we need to align
> > > > > >     on that definition first :)
> > > > > >
> > > > > > Also I'm missing a big picture description, that would show what
> > are
> > > > you
> > > > > > trying to achieve and what's the overarching vision
> > > > > > behind all of the current and future FLIPs that you are planning
> in
> > > > this
> > > > > > area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
> > > > > > Or was it described somewhere and I've missed it?
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > >
> > > > > >
> > > > > > czw., 6 lip 2023 o 06:25 Dong Lin <lindon...@gmail.com>
> > napisał(a):
> > > > > >
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > I am sorry if you feel unhappy or upset with us for not
> > > > > following/fixing
> > > > > > > your proposal. It is not my intention to give you this feeling.
> > > After
> > > > > > all,
> > > > > > > we are all trying to make Flink better, to support more
> use-case
> > > with
> > > > > the
> > > > > > > most maintainable code. I hope you understand that just like
> > you, I
> > > > > have
> > > > > > > also been doing my best to think through various design options
> > and
> > > > > > taking
> > > > > > > time to evalute the pros/cons. Eventually, we probably still
> need
> > > to
> > > > > > reach
> > > > > > > consensus by clearly listing and comparing the objective
> > pros/cons
> > > of
> > > > > > > different proposals and identifying the best choice.
> > > > > > >
> > > > > > > Regarding your concern (or frustration) that we are always
> > finding
> > > > > issues
> > > > > > > in your proposal, I would say it is normal (and probably
> > necessary)
> > > > for
> > > > > > > developers to find pros/cons in each other's solutions, so that
> > we
> > > > can
> > > > > > > eventually pick the right one. I will appreciate anyone who can
> > > > > correctly
> > > > > > > pinpoint the concrete issue in my proposal so that I can
> improve
> > it
> > > > or
> > > > > > > choose an alternative solution.
> > > > > > >
> > > > > > > Regarding your concern that we are not spending enough effort
> to
> > > find
> > > > > > > solutions and that the problem in your solution can be solved
> in
> > a
> > > > > > minute,
> > > > > > > I would like to say that is not true. For each of your previous
> > > > > > proposals,
> > > > > > > I typically spent 1+ hours thinking through your proposal to
> > > > understand
> > > > > > > whether it works and why it does not work, and another 1+ hour
> to
> > > > write
> > > > > > > down the details and explain why it does not work. And I have
> > had a
> > > > > > variety
> > > > > > > of offline discussions with my colleagues discussing various
> > > > proposals
> > > > > > > (including yours) with 6+ hours in total. Maybe I am not
> capable
> > > > enough
> > > > > > to
> > > > > > > fix those issues in one minute or so so. If you think your
> > proposal
> > > > can
> > > > > > be
> > > > > > > easily fixed in one minute or so, I would really appreciate it
> if
> > > you
> > > > > can
> > > > > > > think through your proposal and fix it in the first place :)
> > > > > > >
> > > > > > > For your information, I have had several long discussions with
> my
> > > > > > > colleagues at Alibaba and also Becket on this FLIP. We have
> > > seriously
> > > > > > > considered your proposals and discussed in detail what are the
> > > > > pros/cons
> > > > > > > and whether we can improve these solutions. The initial version
> > of
> > > > this
> > > > > > > FLIP (which allows the source operator to specify checkpoint
> > > > intervals)
> > > > > > > does not get enough support due to concerns of not being
> generic
> > > > (i.e.
> > > > > > > users need to specify checkpoint intervals on a per-source
> > basis).
> > > It
> > > > > is
> > > > > > > only after I updated the FLIP to use the job-level
> > > > > > > execution.checkpointing.interval-during-backlog, then they
> agree
> > to
> > > > > give
> > > > > > +1
> > > > > > > to the FLIP. What I want to tell you is that your suggestions
> > have
> > > > been
> > > > > > > taken seriously, and the quality of the FLIP has been taken
> > > seriously
> > > > > > > by all those who have voted. As a result of taking your
> > suggestion
> > > > > > > seriously and trying to find improvements, we updated the FLIP
> to
> > > use
> > > > > > > isProcessingBacklog.
> > > > > > >
> > > > > > > I am wondering, do you think it will be useful to discuss
> > > > face-to-face
> > > > > > via
> > > > > > > video conference call? It is not just between you and me. We
> can
> > > > invite
> > > > > > the
> > > > > > > developers who are interested to join and help with the
> > discussion.
> > > > > That
> > > > > > > might improve communication efficiency and help us understand
> > each
> > > > > other
> > > > > > > better :)
> > > > > > >
> > > > > > > I am writing this long email to hopefully get your
> > understanding. I
> > > > > care
> > > > > > > much more about the quality of the eventual solution rather
> than
> > > who
> > > > > > > proposed the solution. Please bear with me and see my comments
> > > > inline,
> > > > > > with
> > > > > > > an explanation of the pros/cons of these proposals.
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <
> > > > > piotr.nowoj...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Guys,
> > > > > > > >
> > > > > > > > I would like to ask you again, to spend a bit more effort on
> > > trying
> > > > > to
> > > > > > > find
> > > > > > > > solutions, not just pointing out problems. For 1.5 months,
> > > > > > > > the discussion doesn't go in circle, but I'm suggesting a
> > > solution,
> > > > > you
> > > > > > > are
> > > > > > > > trying to undermine it with some arguments, I'm coming
> > > > > > > > back with a fix, often an extremely easy one, only for you to
> > try
> > > > to
> > > > > > find
> > > > > > > > yet another "issue". It doesn't bode well, if you are finding
> > > > > > > > a "problem" that can be solved with a minute or so of
> thinking
> > or
> > > > > even
> > > > > > > has
> > > > > > > > already been solved.
> > > > > > > >
> > > > > > > > I have provided you so far with at least three distinct
> > solutions
> > > > > that
> > > > > > > > could address your exact target use-case. Two [1][2] generic
> > > > > > > > enough to be probably good enough for the foreseeable future,
> > one
> > > > > > > > intermediate and not generic [3] but which wouldn't
> > > > > > > > require @Public API changes or some custom hidden interfaces.
> > > > > > >
> > > > > > >
> > > > > > > > All in all:
> > > > > > > > - [1] with added metric hints like "isProcessingBacklog"
> solves
> > > > your
> > > > > > > target
> > > > > > > > use case pretty well. Downside is having to improve
> > > > > > > >   how JM is collecting/aggregating metrics
> > > > > > > >
> > > > > > >
> > > > > > > Here is my analysis of this proposal compared to the current
> > > approach
> > > > > in
> > > > > > > the FLIP-309.
> > > > > > >
> > > > > > > pros:
> > > > > > > - No need to add the public API
> > > > > > > SplitEnumeratorContext#setIsProcessingBacklog.
> > > > > > > cons:
> > > > > > > - Need to add a public API that subclasses of SourceReader can
> > use
> > > to
> > > > > > > specify its IsProcessingBacklog metric value.
> > > > > > > - Source Coordinator needs to periodically pull the
> > > > isProcessingBacklog
> > > > > > > metrics from all TMs throughout the job execution.
> > > > > > >
> > > > > > > Here is why I think the cons outweigh the pros:
> > > > > > > 1) JM needs to collect/aggregate metrics with extra runtime
> > > overhead,
> > > > > > which
> > > > > > > is not necessary for the target use-case with the push-based
> > > approach
> > > > > in
> > > > > > > FLIP-309.
> > > > > > > 2) For the target use-case, it is simpler and more intuitive
> for
> > > > source
> > > > > > > operators (e.g. HybridSource, MySQL CDC source) to be able to
> set
> > > its
> > > > > > > isProcessingBacklog status in the SplitEnumerator. This is
> > because
> > > > the
> > > > > > > switch between bounded/unbounded stages happens in their
> > > > > SplitEnumerator.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > - [2] is basically an equivalent of [1], replacing metrics
> with
> > > > > events.
> > > > > > > It
> > > > > > > > also is a superset of your proposal
> > > > > > > >
> > > > > > >
> > > > > > > Previously, I thought you meant to add a generic logic in
> > > > > > SourceReaderBase
> > > > > > > to read existing metrics (e.g. backpressure) and emit the
> > > > > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I
> > have
> > > > > > > misunderstood your suggetions.
> > > > > > >
> > > > > > > After double-checking your previous suggestion, I am wondering
> if
> > > you
> > > > > are
> > > > > > > OK with the following approach:
> > > > > > >
> > > > > > > - Add a job-level config
> > > > > execution.checkpointing.interval-during-backlog
> > > > > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > > > > isProcessingBacklog).
> > > > > > > - When this API is invoked, it internally sends an
> > > > > > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > > > > > - SourceCoordinator should keep track of the latest
> > > > isProcessingBacklog
> > > > > > > status from all its subtasks. And for now, we will hardcode the
> > > logic
> > > > > > such
> > > > > > > that if any source reader says it is under backlog, then
> > > > > > > execution.checkpointing.interval-during-backlog is used.
> > > > > > >
> > > > > > > This approach looks good to me as it can achieve the same
> > > performance
> > > > > > with
> > > > > > > the same number of public APIs for the target use-case. And I
> > > suppose
> > > > > in
> > > > > > > the future we might be able to re-use this API for source
> reader
> > to
> > > > set
> > > > > > its
> > > > > > > backlog status based on its backpressure metrics, which could
> be
> > an
> > > > > extra
> > > > > > > advantage over the current approach.
> > > > > > >
> > > > > > > Do you think we can agree to adopt the approach described
> above?
> > > > > > >
> > > > > > >
> > > > > > > - [3] yes, it's hacky, but it's a solution that could be thrown
> > > away
> > > > > once
> > > > > > > > we implement [1] or [2] . The only real theoretical
> > > > > > > >   downside is that it cannot control the long checkpoint
> > exactly
> > > > > (short
> > > > > > > > checkpoint interval has to be a divisor of the long
> checkpoint
> > > > > > > >   interval, but I simply can not imagine a practical use
> where
> > > that
> > > > > > would
> > > > > > > > be a blocker for a user. Please..., someone wanting to set
> > > > > > > >   short checkpoint interval to 3min and long to 7 minutes,
> and
> > > that
> > > > > > > someone
> > > > > > > > can not accept the long interval to be 9 minutes?
> > > > > > > >   And that's even ignoring the fact that if someone has an
> > issue
> > > > with
> > > > > > > the 3
> > > > > > > > minutes checkpoint interval, I can hardly think that merely
> > > > > > > >   doubling the interval to 7 minutes would significantly
> solve
> > > any
> > > > > > > problem
> > > > > > > > for that user.
> > > > > > > >
> > > > > > >
> > > > > > > Yes, this is a fabricated example that shows
> > > > > > > execution.checkpointing.interval-during-backlog might not be
> > > > accurately
> > > > > > > enforced with this option. I think you are probably right that
> it
> > > > might
> > > > > > not
> > > > > > > matter that much. I just think we should try our best to make
> > Flink
> > > > > > public
> > > > > > > API's semantics (including configuration) clear, simple, and
> > > > > enforceable.
> > > > > > > If we can make the user-facing configuration enforceable at the
> > > cost
> > > > of
> > > > > > an
> > > > > > > extra developer facing API (i.e. setProcessingBacklog(...)), I
> > > would
> > > > > > prefer
> > > > > > > to do this.
> > > > > > >
> > > > > > > It seems that we both agree that option [2] is better than
> [3]. I
> > > > will
> > > > > > skip
> > > > > > > the further comments for this option and we can probably focus
> on
> > > > > > > option [2] :)
> > > > > > >
> > > > > > >
> > > > > > > > Dong a long time ago you wrote:
> > > > > > > > > Sure. Then let's decide the final solution first.
> > > > > > > >
> > > > > > > > Have you thought about that? Maybe I'm wrong but I don't
> > remember
> > > > you
> > > > > > > > describing in any of your proposals how they could be
> > > > > > > > extended in the future, to cover more generic cases.
> Regardless
> > > if
> > > > > you
> > > > > > > > either don't believe in the generic solution or struggle to
> > > > > > > >
> > > > > > >
> > > > > > > Yes, I have thought about the plan to extend the current FLIP
> to
> > > > > support
> > > > > > > metrics (e.g. backpressure) based solution you described
> earlier.
> > > > > > Actually,
> > > > > > > I mentioned multiple times in the earlier email that your
> > > suggestion
> > > > of
> > > > > > > using metrics is valuable and I will do this in a follow-up
> FLIP.
> > > > > > >
> > > > > > > Here are my comments from the previous email:
> > > > > > > - See "I will add follow-up FLIPs to make use of the event-time
> > > > metrics
> > > > > > and
> > > > > > > backpressure metrics" from Jul 3, 2023, 6:39 PM
> > > > > > > - See "I agree it is valuable" from Jul 1, 2023, 11:00 PM
> > > > > > > - See "we will create a followup FLIP (probably in FLIP-328)"
> > from
> > > > Jun
> > > > > > 29,
> > > > > > > 2023, 11:01 AM
> > > > > > >
> > > > > > > Frankly speaking, I think the idea around using the
> backpressure
> > > > > metrics
> > > > > > > still needs a bit more thinking before we can propose a FLIP.
> > But I
> > > > am
> > > > > > > pretty sure we can make use of the watermark/event-time to
> > > determine
> > > > > the
> > > > > > > backlog status.
> > > > > > >
> > > > > > > grasp it, if you can come back with something that can be
> easily
> > > > > extended
> > > > > > > > in the future, up to a point where one could implement
> > > > > > > > something similar to this backpressure detecting algorithm
> > that I
> > > > > > > mentioned
> > > > > > > > many times before, I would be happy to discuss and
> > > > > > > > support it.
> > > > > > > >
> > > > > > >
> > > > > > > Here is my idea of extending the source reader to support
> > > > > > event-time-based
> > > > > > > backlog detecting algorithms:
> > > > > > >
> > > > > > > - Add a job-level config such as
> > > watermark-lag-threshold-for-backlog.
> > > > > If
> > > > > > > any source reader determines that the event-timestamp is
> > available
> > > > and
> > > > > > the
> > > > > > > system-time - watermark exceeds this threshold, then the source
> > > > reader
> > > > > > > considers its isProcessingBacklog=true.
> > > > > > > - The source reader can send an event to the source
> coordinator.
> > > Note
> > > > > > that
> > > > > > > this might be doable in the SourceReaderBase without adding any
> > > > public
> > > > > > API
> > > > > > > which the concrete SourceReader subclass needs to explicitly
> > > invoke.
> > > > > > > - And in the future if FLIP-325 is accepted, insteading of
> > sending
> > > > the
> > > > > > > event to SourceCoordinator and let SourceCoordinator inform the
> > > > > > checkpoint
> > > > > > > coordinator, the source reader might just emit the information
> as
> > > > part
> > > > > of
> > > > > > > the RecordAttributes and let the two-phase commit sink inform
> the
> > > > > > > checkpoint coordinator.
> > > > > > >
> > > > > > > Note that this is a sketch of the idea and it might need
> further
> > > > > > > improvement. I just hope you understand that we have thought
> > about
> > > > this
> > > > > > > idea and did quite a lot of thinking for these design options.
> If
> > > it
> > > > is
> > > > > > OK
> > > > > > > with you, I hope we can make incremental progress and discuss
> the
> > > > > > > metrics-based solution separately in a follow-up FLIP.
> > > > > > >
> > > > > > > Last but not least, thanks for taking so much time to leave
> > > comments
> > > > > and
> > > > > > > help us improve the FLIP. Please kindly bear with us in this
> > > > > discussion.
> > > > > > I
> > > > > > > am looking forward to collaborating with you to find the best
> > > design
> > > > > for
> > > > > > > the target use-cases.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > > Hang, about your points 1. and 2., do you think those
> problems
> > > are
> > > > > > > > insurmountable and blockers for that counter proposal?
> > > > > > > >
> > > > > > > > > 1. It is hard to find the error checkpoint.
> > > > > > > >
> > > > > > > > No it's not, please take a look at what I exactly proposed
> and
> > > > maybe
> > > > > at
> > > > > > > the
> > > > > > > > code.
> > > > > > > >
> > > > > > > > > 2. (...) The failed checkpoint may make them think the job
> is
> > > > > > > unhealthy.
> > > > > > > >
> > > > > > > > Please read again what I wrote in [3]. I'm mentioning there a
> > > > > solution
> > > > > > > for
> > > > > > > > this exact "problem".
> > > > > > > >
> > > > > > > > About the necessity of the config value, I'm still not
> > convinced
> > > > > that's
> > > > > > > > needed from the start, but yes we can add some config option
> > > > > > > > if you think otherwise. This option, if named properly, could
> > be
> > > > > > re-used
> > > > > > > in
> > > > > > > > the future for different solutions, so that's fine by me.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Piotrek
> > > > > > > >
> > > > > > > > [1] Introduced in my very first e-mail from 23 maj 2023,
> 16:26,
> > > and
> > > > > > > refined
> > > > > > > > later with point "2." in my e-mail from 16 June 2023, 17:58
> > > > > > > > [2] Section "2. ===============" in my e-mail from 30 June
> > 2023,
> > > > > 16:34
> > > > > > > > [3] Section "3. ===============" in my e-mail from 30 June
> > 2023,
> > > > > 16:34
> > > > > > > >
> > > > > > > > All times in CEST.
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to