Looks good to me!

Thanks Dong, Yunfeng and all for your discussion and design.

Best,
Jingsong

On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com> wrote:
>
> Thank you Dong for driving this FLIP.
>
> The new design looks good to me!
>
> Best,
> Jark
>
> > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
> >
> > Thank you Leonard for the review!
> >
> > Hi Piotr, do you have any comments on the latest proposal?
> >
> > I am wondering if it is OK to start the voting thread this week.
> >
> > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote:
> >
> >> Thanks Dong for driving this FLIP forward!
> >>
> >> Introducing  `backlog status` concept for flink job makes sense to me as
> >> following reasons:
> >>
> >> From concept/API design perspective, it’s more general and natural than
> >> above proposals as it can be used in HybridSource for bounded records, CDC
> >> Source for history snapshot and general sources like KafkaSource for
> >> historical messages.
> >>
> >> From user cases/requirements, I’ve seen many users manually to set larger
> >> checkpoint interval during backfilling and then set a shorter checkpoint
> >> interval for real-time processing in their production environments as a
> >> flink application optimization. Now, the flink framework can make this
> >> optimization no longer require the user to set the checkpoint interval and
> >> restart the job multiple times.
> >>
> >> Following supporting using larger checkpoint for job under backlog status
> >> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> >> for job under backlog status in the future.
> >>
> >> In short, the updated FLIP looks good to me.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> Thanks again for proposing the isProcessingBacklog concept.
> >>>
> >>> After discussing with Becket Qin and thinking about this more, I agree it
> >>> is a better idea to add a top-level concept to all source operators to
> >>> address the target use-case.
> >>>
> >>> The main reason that changed my mind is that isProcessingBacklog can be
> >>> described as an inherent/nature attribute of every source instance and
> >> its
> >>> semantics does not need to depend on any specific checkpointing policy.
> >>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
> >>> have considered so far (e.g. HybridSource and MySQL CDC source) without
> >>> asking users to explicitly configure the per-source behavior, which
> >> indeed
> >>> provides better user experience.
> >>>
> >>> I have updated the FLIP based on the latest suggestions. The latest FLIP
> >> no
> >>> longer introduces per-source config that can be used by end-users. While
> >> I
> >>> agree with you that CheckpointTrigger can be a useful feature to address
> >>> additional use-cases, I am not sure it is necessary for the use-case
> >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> >>> in another FLIP?
> >>>
> >>> Can you help take another look at the updated FLIP?
> >>>
> >>> Best,
> >>> Dong
> >>>
> >>>
> >>>
> >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org>
> >>> wrote:
> >>>
> >>>> Hi Dong,
> >>>>
> >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>>>> "backpressured" at a given time (due to random traffic spikes). Then at
> >>>> any
> >>>>> given time, the chance of the job
> >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> >> the
> >>>>> backpressure metric once a second, the estimated time for the job
> >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> >> 23163
> >>>>> sec = 6.4 hours.
> >>>>>
> >>>>> This means that the job will effectively always use the longer
> >>>>> checkpointing interval. It looks like a real concern, right?
> >>>>
> >>>> Sorry I don't understand where you are getting those numbers from.
> >>>> Instead of trying to find loophole after loophole, could you try to
> >> think
> >>>> how a given loophole could be improved/solved?
> >>>>
> >>>>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>>>> following reasons.
> >>>>
> >>>> Please propose something. I don't think it's needed.
> >>>>
> >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >>>> APIs
> >>>>> of this alternative approach be more or less usable?
> >>>>
> >>>> Everything that you originally wanted to achieve in FLIP-309, you could
> >> do
> >>>> as well in my proposal.
> >>>> Vide my many mentions of the "hacky solution".
> >>>>
> >>>>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>>>> checkpointing interval to change dynamically even during the unbounded
> >>>>> phase) as it claims?
> >>>>
> >>>> I don't see why not.
> >>>>
> >>>>> - Can these APIs be decoupled from the APIs currently proposed in
> >>>> FLIP-309?
> >>>>
> >>>> Yes
> >>>>
> >>>>> For example, if the APIs of this alternative approach can be decoupled
> >>>> from
> >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>>>> work on this extra use-case with a more advanced/complicated design
> >>>>> separately in a followup work.
> >>>>
> >>>> As I voiced my concerns previously, the current design of FLIP-309 would
> >>>> clog the public API and in the long run confuse the users. IMO It's
> >>>> addressing the
> >>>> problem in the wrong place.
> >>>>
> >>>>> Hmm.. do you mean we can do the following:
> >>>>> - Have all source operators emit a metric named "processingBacklog".
> >>>>> - Add a job-level config that specifies "the checkpointing interval to
> >> be
> >>>>> used when any source is processing backlog".
> >>>>> - The JM collects the "processingBacklog" periodically from all source
> >>>>> operators and uses the newly added config value as appropriate.
> >>>>
> >>>> Yes.
> >>>>
> >>>>> The challenge with this approach is that we need to define the
> >> semantics
> >>>> of
> >>>>> this "processingBacklog" metric and have all source operators
> >>>>> implement this metric. I am not sure we are able to do this yet without
> >>>>> having users explicitly provide this information on a per-source basis.
> >>>>>
> >>>>> Suppose the job read from a bounded Kafka source, should it emit
> >>>>> "processingBacklog=true"? If yes, then the job might use long
> >>>> checkpointing
> >>>>> interval even
> >>>>> if the job is asked to process data starting from now to the next 1
> >> hour.
> >>>>> If no, then the job might use the short checkpointing interval
> >>>>> even if the job is asked to re-process data starting from 7 days ago.
> >>>>
> >>>> Yes. The same can be said of your proposal. Your proposal has the very
> >> same
> >>>> issues
> >>>> that every source would have to implement it differently, most sources
> >>>> would
> >>>> have no idea how to properly calculate the new requested checkpoint
> >>>> interval,
> >>>> for those that do know how to do that, user would have to configure
> >> every
> >>>> source
> >>>> individually and yet again we would end up with a system, that works
> >> only
> >>>> partially in
> >>>> some special use cases (HybridSource), that's confusing the users even
> >>>> more.
> >>>>
> >>>> That's why I think the more generic solution, working primarily on the
> >> same
> >>>> metrics that are used by various auto scaling solutions (like Flink K8s
> >>>> operator's
> >>>> autosaler) would be better. The hacky solution I proposed to:
> >>>> 1. show you that the generic solution is simply a superset of your
> >> proposal
> >>>> 2. if you are adamant that busyness/backpressured/records processing
> >>>> rate/pending records
> >>>>   metrics wouldn't cover your use case sufficiently (imo they can),
> >> then
> >>>> you can very easily
> >>>>   enhance this algorithm with using some hints from the sources. Like
> >>>> "processingBacklog==true"
> >>>>   to short circuit the main algorithm, if `processingBacklog` is
> >>>> available.
> >>>>
> >>>> Best,
> >>>> Piotrek
> >>>>
> >>>>
> >>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a):
> >>>>
> >>>>> Hi again Piotr,
> >>>>>
> >>>>> Thank you for the reply. Please see my reply inline.
> >>>>>
> >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> >>>> piotr.nowoj...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi again Dong,
> >>>>>>
> >>>>>>> I understand that JM will get the backpressure-related metrics every
> >>>>> time
> >>>>>>> the RestServerEndpoint receives the REST request to get these
> >>>> metrics.
> >>>>>> But
> >>>>>>> I am not sure if RestServerEndpoint is already always receiving the
> >>>>> REST
> >>>>>>> metrics at regular interval (suppose there is no human manually
> >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
> >>>>> interval?
> >>>>>>
> >>>>>> Good catch, I've thought that metrics are pre-emptively sent to JM
> >>>> every
> >>>>> 10
> >>>>>> seconds.
> >>>>>> Indeed that's not the case at the moment, and that would have to be
> >>>>>> improved.
> >>>>>>
> >>>>>>> I would be surprised if Flink is already paying this much overhead
> >>>> just
> >>>>>> for
> >>>>>>> metrics monitoring. That is the main reason I still doubt it is true.
> >>>>> Can
> >>>>>>> you show where this 100 ms is currently configured?
> >>>>>>>
> >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke
> >>>>> the
> >>>>>>> REST API at 100 ms interval. Then that means we need to considerably
> >>>>>>> increase the network/cpu overhead at JM, where the overhead will
> >>>>> increase
> >>>>>>> as the number of TM/slots increase, which may pose risk to the
> >>>>>> scalability
> >>>>>>> of the proposed design. I am not sure we should do this. What do you
> >>>>>> think?
> >>>>>>
> >>>>>> Sorry. I didn't mean metric should be reported every 100ms. I meant
> >>>> that
> >>>>>> "backPressuredTimeMsPerSecond (metric) would report (a value of)
> >>>>> 100ms/s."
> >>>>>> once per metric interval (10s?).
> >>>>>>
> >>>>>
> >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>>>> "backpressured" at a given time (due to random traffic spikes). Then at
> >>>> any
> >>>>> given time, the chance of the job
> >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> >> the
> >>>>> backpressure metric once a second, the estimated time for the job
> >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> >> 23163
> >>>>> sec = 6.4 hours.
> >>>>>
> >>>>> This means that the job will effectively always use the longer
> >>>>> checkpointing interval. It looks like a real concern, right?
> >>>>>
> >>>>>
> >>>>>>> - What is the interface of this CheckpointTrigger? For example, are
> >>>> we
> >>>>>>> going to give CheckpointTrigger a context that it can use to fetch
> >>>>>>> arbitrary metric values? This can help us understand what information
> >>>>>> this
> >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
> >>>> decision.
> >>>>>>
> >>>>>> I honestly don't think this is important at this stage of the
> >>>> discussion.
> >>>>>> It could have
> >>>>>> whatever interface we would deem to be best. Required things:
> >>>>>>
> >>>>>> - access to at least a subset of metrics that the given
> >>>>> `CheckpointTrigger`
> >>>>>> requests,
> >>>>>> for example via some registration mechanism, so we don't have to
> >>>> fetch
> >>>>>> all of the
> >>>>>> metrics all the time from TMs.
> >>>>>> - some way to influence `CheckpointCoordinator`. Either via manually
> >>>>>> triggering
> >>>>>> checkpoints, and/or ability to change the checkpointing interval.
> >>>>>>
> >>>>>
> >>>>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>>>> following reasons.
> >>>>>
> >>>>> We would need to know the concrete APIs to gauge the following:
> >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >>>> APIs
> >>>>> of this alternative approach be more or less usable?
> >>>>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>>>> checkpointing interval to change dynamically even during the unbounded
> >>>>> phase) as it claims?
> >>>>> - Can these APIs be decoupled from the APIs currently proposed in
> >>>> FLIP-309?
> >>>>>
> >>>>> For example, if the APIs of this alternative approach can be decoupled
> >>>> from
> >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>>>> work on this extra use-case with a more advanced/complicated design
> >>>>> separately in a followup work.
> >>>>>
> >>>>>
> >>>>>>> - Where is this CheckpointTrigger running? For example, is it going
> >>>> to
> >>>>>> run
> >>>>>>> on the subtask of every source operator? Or is it going to run on the
> >>>>> JM?
> >>>>>>
> >>>>>> IMO on the JM.
> >>>>>>
> >>>>>>> - Are we going to provide a default implementation of this
> >>>>>>> CheckpointTrigger in Flink that implements the algorithm described
> >>>>> below,
> >>>>>>> or do we expect each source operator developer to implement their own
> >>>>>>> CheckpointTrigger?
> >>>>>>
> >>>>>> As I mentioned before, I think we should provide at the very least the
> >>>>>> implementation
> >>>>>> that replaces the current triggering mechanism (statically configured
> >>>>>> checkpointing interval)
> >>>>>> and it would be great to provide the backpressure monitoring trigger
> >> as
> >>>>>> well.
> >>>>>>
> >>>>>
> >>>>> I agree that if there is a good use-case that can be addressed by the
> >>>>> proposed CheckpointTrigger, then it is reasonable
> >>>>> to add CheckpointTrigger and replace the current triggering mechanism
> >>>> with
> >>>>> it.
> >>>>>
> >>>>> I also agree that we will likely find such a use-case. For example,
> >>>> suppose
> >>>>> the source records have event timestamps, then it is likely
> >>>>> that we can use the trigger to dynamically control the checkpointing
> >>>>> interval based on the difference between the watermark and current
> >> system
> >>>>> time.
> >>>>>
> >>>>> But I am not sure the addition of this CheckpointTrigger should be
> >>>> coupled
> >>>>> with FLIP-309. Whether or not it is coupled probably depends on the
> >>>>> concrete API design around CheckpointTrigger.
> >>>>>
> >>>>> If you would be adamant that the backpressure monitoring doesn't cover
> >>>> well
> >>>>>> enough your use case, I would be ok to provide the hacky version that
> >> I
> >>>>>> also mentioned
> >>>>>> before:
> >>>>>
> >>>>>
> >>>>>> """
> >>>>>> Especially that if my proposed algorithm wouldn't work good enough,
> >>>> there
> >>>>>> is
> >>>>>> an obvious solution, that any source could add a metric, like let say
> >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> >>>>>> could use this as an override to always switch to the
> >>>>>> "slowCheckpointInterval". I don't think we need it, but that's always
> >>>> an
> >>>>>> option
> >>>>>> that would be basically equivalent to your original proposal.
> >>>>>> """
> >>>>>>
> >>>>>
> >>>>> Hmm.. do you mean we can do the following:
> >>>>> - Have all source operators emit a metric named "processingBacklog".
> >>>>> - Add a job-level config that specifies "the checkpointing interval to
> >> be
> >>>>> used when any source is processing backlog".
> >>>>> - The JM collects the "processingBacklog" periodically from all source
> >>>>> operators and uses the newly added config value as appropriate.
> >>>>>
> >>>>> The challenge with this approach is that we need to define the
> >> semantics
> >>>> of
> >>>>> this "processingBacklog" metric and have all source operators
> >>>>> implement this metric. I am not sure we are able to do this yet without
> >>>>> having users explicitly provide this information on a per-source basis.
> >>>>>
> >>>>> Suppose the job read from a bounded Kafka source, should it emit
> >>>>> "processingBacklog=true"? If yes, then the job might use long
> >>>> checkpointing
> >>>>> interval even
> >>>>> if the job is asked to process data starting from now to the next 1
> >> hour.
> >>>>> If no, then the job might use the short checkpointing interval
> >>>>> even if the job is asked to re-process data starting from 7 days ago.
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>> - How can users specify the
> >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> >>>>>>> For example, will we provide APIs on the CheckpointTrigger that
> >>>>> end-users
> >>>>>>> can use to specify the checkpointing interval? What would that look
> >>>>> like?
> >>>>>>
> >>>>>> Also as I mentioned before, just like metric reporters are configured:
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> >>>>>> Every CheckpointTrigger could have its own custom configuration.
> >>>>>>
> >>>>>>> Overall, my gut feel is that the alternative approach based on
> >>>>>>> CheckpointTrigger is more complicated
> >>>>>>
> >>>>>> Yes, as usual, more generic things are more complicated, but often
> >> more
> >>>>>> useful in the long run.
> >>>>>>
> >>>>>>> and harder to use.
> >>>>>>
> >>>>>> I don't agree. Why setting in config
> >>>>>>
> >>>>>> execution.checkpointing.trigger:
> >>>> BackPressureMonitoringCheckpointTrigger
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> >>>>>> 1s
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> >>>>>> 30s
> >>>>>>
> >>>>>> that we could even provide a shortcut to the above construct via:
> >>>>>>
> >>>>>> execution.checkpointing.fast-interval: 1s
> >>>>>> execution.checkpointing.slow-interval: 30s
> >>>>>>
> >>>>>> is harder compared to setting two/three checkpoint intervals, one in
> >>>> the
> >>>>>> config/or via `env.enableCheckpointing(x)`,
> >>>>>> secondly passing one/two (fast/slow) values on the source itself?
> >>>>>>
> >>>>>
> >>>>> If we can address the use-case by providing just the two job-level
> >> config
> >>>>> as described above, I agree it will indeed be simpler.
> >>>>>
> >>>>> I have tried to achieve this goal. But the caveat is that it requires
> >>>> much
> >>>>> more work than described above in order to give the configs
> >> well-defined
> >>>>> semantics. So I find it simpler to just use the approach in FLIP-309.
> >>>>>
> >>>>> Let me explain my concern below. It will be great if you or someone
> >> else
> >>>>> can help provide a solution.
> >>>>>
> >>>>> 1) We need to clearly document when the fast-interval and slow-interval
> >>>>> will be used so that users can derive the expected behavior of the job
> >>>> and
> >>>>> be able to config these values.
> >>>>>
> >>>>> 2) The trigger of fast/slow interval depends on the behavior of the
> >>>> source
> >>>>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source
> >>>>> operator (e.g. boundedness) can describe the target behavior. For
> >>>> example,
> >>>>> MySQL CDC internally has two phases, namely snapshot phase and binlog
> >>>>> phase, which are not explicitly exposed to its users via source
> >> operator
> >>>>> API. And we probably should not enumerate all internal phases of all
> >>>> source
> >>>>> operators that are affected by fast/slow interval.
> >>>>>
> >>>>> 3) An alternative approach might be to define a new concept (e.g.
> >>>>> processingBacklog) that is applied to all source operators. Then the
> >>>>> fast/slow interval's documentation can depend on this concept. That
> >> means
> >>>>> we have to add a top-level concept (similar to source boundedness) and
> >>>>> require all source operators to specify how they enforce this concept
> >>>> (e.g.
> >>>>> FileSystemSource always emits processingBacklog=true). And there might
> >> be
> >>>>> cases where the source itself (e.g. a bounded Kafka Source) can not
> >>>>> automatically derive the value of this concept, in which case we need
> >> to
> >>>>> provide option for users to explicitly specify the value for this
> >> concept
> >>>>> on a per-source basis.
> >>>>>
> >>>>>
> >>>>>
> >>>>>>> And it probably also has the issues of "having two places to
> >>>> configure
> >>>>>> checkpointing
> >>>>>>> interval" and "giving flexibility for every source to implement a
> >>>>>> different
> >>>>>>> API" (as mentioned below).
> >>>>>>
> >>>>>> No, it doesn't.
> >>>>>>
> >>>>>>> IMO, it is a hard-requirement for the user-facing API to be
> >>>>>>> clearly defined and users should be able to use the API without
> >>>> concern
> >>>>>> of
> >>>>>>> regression. And this requirement is more important than the other
> >>>> goals
> >>>>>>> discussed above because it is related to the stability/performance of
> >>>>> the
> >>>>>>> production job. What do you think?
> >>>>>>
> >>>>>> I don't agree with this. There are many things that work something in
> >>>>>> between perfectly and well enough
> >>>>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%),
> >>>>> while
> >>>>>> still being very useful.
> >>>>>> Good examples are: selection of state backend, unaligned checkpoints,
> >>>>>> buffer debloating but frankly if I go
> >>>>>> through list of currently available config options, something like
> >> half
> >>>>> of
> >>>>>> them can cause regressions. Heck,
> >>>>>> even Flink itself doesn't work perfectly in 100% of the use cases, due
> >>>>> to a
> >>>>>> variety of design choices. Of
> >>>>>> course, the more use cases are fine with said feature, the better, but
> >>>> we
> >>>>>> shouldn't fixate to perfectly cover
> >>>>>> 100% of the cases, as that's impossible.
> >>>>>>
> >>>>>> In this particular case, if back pressure monitoring  trigger can work
> >>>>> well
> >>>>>> enough in 95% of cases, I would
> >>>>>> say that's already better than the originally proposed alternative,
> >>>> which
> >>>>>> doesn't work at all if user has a large
> >>>>>> backlog to reprocess from Kafka, including when using HybridSource
> >>>> AFTER
> >>>>>> the switch to Kafka has
> >>>>>> happened. For the remaining 5%, we should try to improve the behaviour
> >>>>> over
> >>>>>> time, but ultimately, users can
> >>>>>> decide to just run a fixed checkpoint interval (or at worst use the
> >>>> hacky
> >>>>>> checkpoint trigger that I mentioned
> >>>>>> before a couple of times).
> >>>>>>
> >>>>>> Also to be pedantic, if a user naively selects slow-interval in your
> >>>>>> proposal to 30 minutes, when that user's
> >>>>>> job fails on average every 15-20minutes, his job can end up in a state
> >>>>> that
> >>>>>> it can not make any progress,
> >>>>>> this arguably is quite serious regression.
> >>>>>>
> >>>>>
> >>>>> I probably should not say it is "hard requirement". After all there are
> >>>>> pros/cons. We will need to consider implementation complexity,
> >> usability,
> >>>>> extensibility etc.
> >>>>>
> >>>>> I just don't think we should take it for granted to introduce
> >> regression
> >>>>> for one use-case in order to support another use-case. If we can not
> >> find
> >>>>> an algorithm/solution that addresses
> >>>>> both use-case well, I hope we can be open to tackle them separately so
> >>>> that
> >>>>> users can choose the option that best fits their needs.
> >>>>>
> >>>>> All things else being equal, I think it is preferred for user-facing
> >> API
> >>>> to
> >>>>> be clearly defined and let users should be able to use the API without
> >>>>> concern of regression.
> >>>>>
> >>>>> Maybe we can list pros/cons for the alternative approaches we have been
> >>>>> discussing and see choose the best approach. And maybe we will end up
> >>>>> finding that use-case
> >>>>> which needs CheckpointTrigger can be tackled separately from the
> >> use-case
> >>>>> in FLIP-309.
> >>>>>
> >>>>>
> >>>>>>> I am not sure if there is a typo. Because if
> >>>>> backPressuredTimeMsPerSecond
> >>>>>> =
> >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> >>>> numRecordsInPerSecond /
> >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> >>>>>>>
> >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> >>>>>> (numRecordsInPerSecond
> >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> >>>> metricsUpdateInterval"?
> >>>>>>
> >>>>>> It looks like there is indeed some mistake in my proposal above. Yours
> >>>>> look
> >>>>>> more correct, it probably
> >>>>>> still needs some safeguard/special handling if
> >>>>>> `backPressuredTimeMsPerSecond > 950`
> >>>>>>
> >>>>>>> The only information it can access is the backlog.
> >>>>>>
> >>>>>> Again no. It can access whatever we want to provide to it.
> >>>>>>
> >>>>>> Regarding the rest of your concerns. It's a matter of tweaking the
> >>>>>> parameters and the algorithm itself,
> >>>>>> and how much safety-net do we want to have. Ultimately, I'm pretty
> >> sure
> >>>>>> that's a (for 95-99% of cases)
> >>>>>> solvable problem. If not, there is always the hacky solution, that
> >>>> could
> >>>>> be
> >>>>>> even integrated into this above
> >>>>>> mentioned algorithm as a short circuit to always reach
> >> `slow-interval`.
> >>>>>>
> >>>>>> Apart of that, you picked 3 minutes as the checkpointing interval in
> >>>> your
> >>>>>> counter example. In most cases
> >>>>>> any interval above 1 minute would inflict pretty negligible overheads,
> >>>> so
> >>>>>> all in all, I would doubt there is
> >>>>>> a significant benefit (in most cases) of increasing 3 minute
> >> checkpoint
> >>>>>> interval to anything more, let alone
> >>>>>> 30 minutes.
> >>>>>>
> >>>>>
> >>>>> I am not sure we should design the algorithm with the assumption that
> >> the
> >>>>> short checkpointing interval will always be higher than 1 minute etc.
> >>>>>
> >>>>> I agree the proposed algorithm can solve most cases where the resource
> >> is
> >>>>> sufficient and there is always no backlog in source subtasks. On the
> >>>> other
> >>>>> hand, what makes SRE
> >>>>> life hard is probably the remaining 1-5% cases where the traffic is
> >> spiky
> >>>>> and the cluster is reaching its capacity limit. The ability to predict
> >>>> and
> >>>>> control Flink job's behavior (including checkpointing interval) can
> >>>>> considerably reduce the burden of manging Flink jobs.
> >>>>>
> >>>>> Best,
> >>>>> Dong
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Piotrek
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a):
> >>>>>>
> >>>>>>> Hi Piotr,
> >>>>>>>
> >>>>>>> Thanks for the explanations. I have some followup questions below.
> >>>>>>>
> >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi All,
> >>>>>>>>
> >>>>>>>> Thanks for chipping in the discussion Ahmed!
> >>>>>>>>
> >>>>>>>> Regarding using the REST API. Currently I'm leaning towards
> >>>>>> implementing
> >>>>>>>> this feature inside the Flink itself, via some pluggable interface.
> >>>>>>>> REST API solution would be tempting, but I guess not everyone is
> >>>>> using
> >>>>>>>> Flink Kubernetes Operator.
> >>>>>>>>
> >>>>>>>> @Dong
> >>>>>>>>
> >>>>>>>>> I am not sure metrics such as isBackPressured are already sent to
> >>>>> JM.
> >>>>>>>>
> >>>>>>>> Fetching code path on the JM:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> >>>>>>>>
> >>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> >>>>>>>>
> >>>>>>>> Example code path accessing Task level metrics via JM using the
> >>>>>>>> `MetricStore`:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> >>>>>>>>
> >>>>>>>
> >>>>>>> Thanks for the code reference. I checked the code that invoked these
> >>>>> two
> >>>>>>> classes and found the following information:
> >>>>>>>
> >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked
> >>>> only
> >>>>>>> when AggregatingJobsMetricsHandler is invoked.
> >>>>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by
> >>>>>>> WebMonitorEndpoint#initializeHandlers
> >>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by
> >>>>>> RestServerEndpoint.
> >>>>>>> And RestServerEndpoint invokes these handlers in response to external
> >>>>>> REST
> >>>>>>> request.
> >>>>>>>
> >>>>>>> I understand that JM will get the backpressure-related metrics every
> >>>>> time
> >>>>>>> the RestServerEndpoint receives the REST request to get these
> >>>> metrics.
> >>>>>> But
> >>>>>>> I am not sure if RestServerEndpoint is already always receiving the
> >>>>> REST
> >>>>>>> metrics at regular interval (suppose there is no human manually
> >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
> >>>>> interval?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>>> For example, let's say every source operator subtask reports this
> >>>>>>> metric
> >>>>>>>> to
> >>>>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each
> >>>>>>> subtask
> >>>>>>>>> is backpressured roughly 10% of the total time due to traffic
> >>>>> spikes
> >>>>>>> (and
> >>>>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 =
> >>>>>>> 99.997%
> >>>>>>>>> chance that there is at least one subtask that is backpressured.
> >>>>> Then
> >>>>>>> we
> >>>>>>>>> have to wait for at least 10 seconds to check again.
> >>>>>>>>
> >>>>>>>> backPressuredTimeMsPerSecond and other related metrics (like
> >>>>>>>> busyTimeMsPerSecond) are not subject to that problem.
> >>>>>>>> They are recalculated once every metric fetching interval, and they
> >>>>>>> report
> >>>>>>>> accurately on average the given subtask spent
> >>>>>> busy/idling/backpressured.
> >>>>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s.
> >>>>>>>
> >>>>>>>
> >>>>>>> Suppose every subtask is already reporting
> >>>> backPressuredTimeMsPerSecond
> >>>>>> to
> >>>>>>> JM once every 100 ms. If a job has 10 operators (that are not
> >>>> chained)
> >>>>>> and
> >>>>>>> each operator has 100 subtasks, then JM would need to handle 10000
> >>>>>> requests
> >>>>>>> per second to receive metrics from these 1000 subtasks. It seems
> >>>> like a
> >>>>>>> non-trivial overhead for medium-to-large sized jobs and can make JM
> >>>> the
> >>>>>>> performance bottleneck during job execution.
> >>>>>>>
> >>>>>>> I would be surprised if Flink is already paying this much overhead
> >>>> just
> >>>>>> for
> >>>>>>> metrics monitoring. That is the main reason I still doubt it is true.
> >>>>> Can
> >>>>>>> you show where this 100 ms is currently configured?
> >>>>>>>
> >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke
> >>>>> the
> >>>>>>> REST API at 100 ms interval. Then that means we need to considerably
> >>>>>>> increase the network/cpu overhead at JM, where the overhead will
> >>>>> increase
> >>>>>>> as the number of TM/slots increase, which may pose risk to the
> >>>>>> scalability
> >>>>>>> of the proposed design. I am not sure we should do this. What do you
> >>>>>> think?
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> While it will be nice to support additional use-cases
> >>>>>>>>> with one proposal, it is probably also reasonable to make
> >>>>> incremental
> >>>>>>>>> progress and support the low-hanging-fruit use-case first. The
> >>>>> choice
> >>>>>>>>> really depends on the complexity and the importance of supporting
> >>>>> the
> >>>>>>>> extra
> >>>>>>>>> use-cases.
> >>>>>>>>
> >>>>>>>> That would be true, if that was a private implementation detail or
> >>>> if
> >>>>>> the
> >>>>>>>> low-hanging-fruit-solution would be on the direct path to the final
> >>>>>>>> solution.
> >>>>>>>> That's unfortunately not the case here. This will add public facing
> >>>>>> API,
> >>>>>>>> that we will later need to maintain, no matter what the final
> >>>>> solution
> >>>>>>> will
> >>>>>>>> be,
> >>>>>>>> and at the moment at least I don't see it being related to a
> >>>>> "perfect"
> >>>>>>>> solution.
> >>>>>>>
> >>>>>>>
> >>>>>>> Sure. Then let's decide the final solution first.
> >>>>>>>
> >>>>>>>
> >>>>>>>>> I guess the point is that the suggested approach, which
> >>>> dynamically
> >>>>>>>>> determines the checkpointing interval based on the backpressure,
> >>>>> may
> >>>>>>>> cause
> >>>>>>>>> regression when the checkpointing interval is relatively low.
> >>>> This
> >>>>>>> makes
> >>>>>>>> it
> >>>>>>>>> hard for users to enable this feature in production. It is like
> >>>> an
> >>>>>>>>> auto-driving system that is not guaranteed to work
> >>>>>>>>
> >>>>>>>> Yes, creating a more generic solution that would require less
> >>>>>>> configuration
> >>>>>>>> is usually more difficult then static configurations.
> >>>>>>>> It doesn't mean we shouldn't try to do that. Especially that if my
> >>>>>>> proposed
> >>>>>>>> algorithm wouldn't work good enough, there is
> >>>>>>>> an obvious solution, that any source could add a metric, like let
> >>>> say
> >>>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> >>>>>>>> could use this as an override to always switch to the
> >>>>>>>> "slowCheckpointInterval". I don't think we need it, but that's
> >>>> always
> >>>>>> an
> >>>>>>>> option
> >>>>>>>> that would be basically equivalent to your original proposal. Or
> >>>> even
> >>>>>>>> source could add "suggestedCheckpointInterval : int", and
> >>>>>>>> `CheckpointTrigger` could use that value if present as a hint in
> >>>> one
> >>>>>> way
> >>>>>>> or
> >>>>>>>> another.
> >>>>>>>>
> >>>>>>>
> >>>>>>> So far we have talked about the possibility of using
> >>>> CheckpointTrigger
> >>>>>> and
> >>>>>>> mentioned the CheckpointTrigger
> >>>>>>> and read metric values.
> >>>>>>>
> >>>>>>> Can you help answer the following questions so that I can understand
> >>>>> the
> >>>>>>> alternative solution more concretely:
> >>>>>>>
> >>>>>>> - What is the interface of this CheckpointTrigger? For example, are
> >>>> we
> >>>>>>> going to give CheckpointTrigger a context that it can use to fetch
> >>>>>>> arbitrary metric values? This can help us understand what information
> >>>>>> this
> >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
> >>>> decision.
> >>>>>>> - Where is this CheckpointTrigger running? For example, is it going
> >>>> to
> >>>>>> run
> >>>>>>> on the subtask of every source operator? Or is it going to run on the
> >>>>> JM?
> >>>>>>> - Are we going to provide a default implementation of this
> >>>>>>> CheckpointTrigger in Flink that implements the algorithm described
> >>>>> below,
> >>>>>>> or do we expect each source operator developer to implement their own
> >>>>>>> CheckpointTrigger?
> >>>>>>> - How can users specify the
> >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> >>>>>>> For example, will we provide APIs on the CheckpointTrigger that
> >>>>> end-users
> >>>>>>> can use to specify the checkpointing interval? What would that look
> >>>>> like?
> >>>>>>>
> >>>>>>> Overall, my gut feel is that the alternative approach based on
> >>>>>>> CheckpointTrigger is more complicated and harder to use. And it
> >>>>> probably
> >>>>>>> also has the issues of "having two places to configure checkpointing
> >>>>>>> interval" and "giving flexibility for every source to implement a
> >>>>>> different
> >>>>>>> API" (as mentioned below).
> >>>>>>>
> >>>>>>> Maybe we can evaluate it more after knowing the answers to the above
> >>>>>>> questions.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> On the other hand, the approach currently proposed in the FLIP is
> >>>>>> much
> >>>>>>>>> simpler as it does not depend on backpressure. Users specify the
> >>>>>> extra
> >>>>>>>>> interval requirement on the specific sources (e.g. HybridSource,
> >>>>>> MySQL
> >>>>>>>> CDC
> >>>>>>>>> Source) and can easily know the checkpointing interval will be
> >>>> used
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>> continuous phase of the corresponding source. This is pretty much
> >>>>>> same
> >>>>>>> as
> >>>>>>>>> how users use the existing execution.checkpointing.interval
> >>>> config.
> >>>>>> So
> >>>>>>>>> there is no extra concern of regression caused by this approach.
> >>>>>>>>
> >>>>>>>> To an extent, but as I have already previously mentioned I really
> >>>>>> really
> >>>>>>> do
> >>>>>>>> not like idea of:
> >>>>>>>> - having two places to configure checkpointing interval (config
> >>>>> file
> >>>>>>> and
> >>>>>>>> in the Source builders)
> >>>>>>>> - giving flexibility for every source to implement a different
> >>>> API
> >>>>>> for
> >>>>>>>> that purpose
> >>>>>>>> - creating a solution that is not generic enough, so that we will
> >>>>>> need
> >>>>>>> a
> >>>>>>>> completely different mechanism in the future anyway
> >>>>>>>>
> >>>>>>>
> >>>>>>> Yeah, I understand different developers might have different
> >>>>>>> concerns/tastes for these APIs. Ultimately, there might not be a
> >>>>> perfect
> >>>>>>> solution and we have to choose based on the pros/cons of these
> >>>>> solutions.
> >>>>>>>
> >>>>>>> I agree with you that, all things being equal, it is preferable to 1)
> >>>>>> have
> >>>>>>> one place to configure checkpointing intervals, 2) have all source
> >>>>>>> operators use the same API, and 3) create a solution that is generic
> >>>>> and
> >>>>>>> last lasting. Note that these three goals affects the usability and
> >>>>>>> extensibility of the API, but not necessarily the
> >>>> stability/performance
> >>>>>> of
> >>>>>>> the production job.
> >>>>>>>
> >>>>>>> BTW, there are also other preferrable goals. For example, it is very
> >>>>>> useful
> >>>>>>> for the job's behavior to be predictable and interpretable so that
> >>>> SRE
> >>>>>> can
> >>>>>>> operator/debug the Flink in an easier way. We can list these
> >>>> pros/cons
> >>>>>>> altogether later.
> >>>>>>>
> >>>>>>> I am wondering if we can first agree on the priority of goals we want
> >>>>> to
> >>>>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be
> >>>>>>> clearly defined and users should be able to use the API without
> >>>> concern
> >>>>>> of
> >>>>>>> regression. And this requirement is more important than the other
> >>>> goals
> >>>>>>> discussed above because it is related to the stability/performance of
> >>>>> the
> >>>>>>> production job. What do you think?
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> Sounds good. Looking forward to learning more ideas.
> >>>>>>>>
> >>>>>>>> I have thought about this a bit more, and I think we don't need to
> >>>>>> check
> >>>>>>>> for the backpressure status, or how much overloaded all of the
> >>>>>> operators
> >>>>>>>> are.
> >>>>>>>> We could just check three things for source operators:
> >>>>>>>> 1. pendingRecords (backlog length)
> >>>>>>>> 2. numRecordsInPerSecond
> >>>>>>>> 3. backPressuredTimeMsPerSecond
> >>>>>>>>
> >>>>>>>> // int metricsUpdateInterval = 10s // obtained from config
> >>>>>>>> // Next line calculates how many records can we consume from the
> >>>>>> backlog,
> >>>>>>>> assuming
> >>>>>>>> // that magically the reason behind a backpressure vanishes. We
> >>>> will
> >>>>>> use
> >>>>>>>> this only as
> >>>>>>>> // a safeguard  against scenarios like for example if backpressure
> >>>>> was
> >>>>>>>> caused by some
> >>>>>>>> // intermittent failure/performance degradation.
> >>>>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond /
> >>>>> (1000
> >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval
> >>>>>>>>
> >>>>>>>
> >>>>>>> I am not sure if there is a typo. Because if
> >>>>>> backPressuredTimeMsPerSecond =
> >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> >>>> numRecordsInPerSecond /
> >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> >>>>>>>
> >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> >>>>>> (numRecordsInPerSecond
> >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> >>>> metricsUpdateInterval"?
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the
> >>>>>>> backlog
> >>>>>>>> as
> >>>>>>>> // a safeguard against an intermittent back pressure problems, so
> >>>>> that
> >>>>>> we
> >>>>>>>> don't
> >>>>>>>> // calculate next checkpoint interval far far in the future, while
> >>>>> the
> >>>>>>>> backpressure
> >>>>>>>> // goes away before we will recalculate metrics and new
> >>>> checkpointing
> >>>>>>>> interval
> >>>>>>>> timeToConsumeBacklog = (pendingRecords -
> >>>>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Then we can use those numbers to calculate desired checkpointed
> >>>>>> interval
> >>>>>>>> for example like this:
> >>>>>>>>
> >>>>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10;
> >>>> //this
> >>>>>> may
> >>>>>>>> need some refining
> >>>>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval,
> >>>>>>>> calculatedCheckpointInterval), slowCheckpointInterval);
> >>>>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval;
> >>>>>>>>
> >>>>>>>> WDYT?
> >>>>>>>
> >>>>>>>
> >>>>>>> I think the idea of the above algorithm is to incline to use the
> >>>>>>> fastCheckpointInterval unless we are very sure the backlog will take
> >>>> a
> >>>>>> long
> >>>>>>> time to process. This can alleviate the concern of regression during
> >>>>> the
> >>>>>>> continuous_bounded phase since we are more likely to use the
> >>>>>>> fastCheckpointInterval. However, it can cause regression during the
> >>>>>> bounded
> >>>>>>> phase.
> >>>>>>>
> >>>>>>> I will use a concrete example to explain the risk of regression:
> >>>>>>> - The user is using HybridSource to read from HDFS followed by Kafka.
> >>>>> The
> >>>>>>> data in HDFS is old and there is no need for data freshness for the
> >>>>> data
> >>>>>> in
> >>>>>>> HDFS.
> >>>>>>> - The user configures the job as below:
> >>>>>>> - fastCheckpointInterval = 3 minutes
> >>>>>>> - slowCheckpointInterval = 30 minutes
> >>>>>>> - metricsUpdateInterval = 100 ms
> >>>>>>>
> >>>>>>> Using the above formulate, we can know that once pendingRecords
> >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
> >>>>> calculatedCheckpointInterval
> >>>>>> <=
> >>>>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the
> >>>>>>> checkpointing interval. Then in the last 30 minutes of the bounded
> >>>>> phase,
> >>>>>>> the checkpointing frequency will be 10X higher than what the user
> >>>>> wants.
> >>>>>>>
> >>>>>>> Also note that the same issue would also considerably limit the
> >>>>> benefits
> >>>>>> of
> >>>>>>> the algorithm. For example, during the continuous phase, the
> >>>> algorithm
> >>>>>> will
> >>>>>>> only be better than the approach in FLIP-309 when there is at least
> >>>>>>> 30-minutes worth of backlog in the source.
> >>>>>>>
> >>>>>>> Sure, having a slower checkpointing interval in this extreme case
> >>>>> (where
> >>>>>>> there is 30-minutes backlog in the continous-unbounded phase) is
> >>>> still
> >>>>>>> useful when this happens. But since this is the un-common case, and
> >>>> the
> >>>>>>> right solution is probably to do capacity planning to avoid this from
> >>>>>>> happening in the first place, I am not sure it is worth optimizing
> >>>> for
> >>>>>> this
> >>>>>>> case at the cost of regression in the bounded phase and the reduced
> >>>>>>> operational predictability for users (e.g. what checkpointing
> >>>> interval
> >>>>>>> should I expect at this stage of the job).
> >>>>>>>
> >>>>>>> I think the fundamental issue with this algorithm is that it is
> >>>> applied
> >>>>>> to
> >>>>>>> both the bounded phases and the continous_unbounded phases without
> >>>>>> knowing
> >>>>>>> which phase the job is running at. The only information it can access
> >>>>> is
> >>>>>>> the backlog. But two sources with the same amount of backlog do not
> >>>>>>> necessarily mean they have the same data freshness requirement.
> >>>>>>>
> >>>>>>> In this particular example, users know that the data in HDFS is very
> >>>>> old
> >>>>>>> and there is no need for data freshness. Users can express signals
> >>>> via
> >>>>>> the
> >>>>>>> per-source API proposed in the FLIP. This is why the current approach
> >>>>> in
> >>>>>>> FLIP-309 can be better in this case.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Dong
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Piotrek
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Reply via email to