Thank you Dong for driving this FLIP. 

The new design looks good to me!

Best,
Jark

> 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
> 
> Thank you Leonard for the review!
> 
> Hi Piotr, do you have any comments on the latest proposal?
> 
> I am wondering if it is OK to start the voting thread this week.
> 
> On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote:
> 
>> Thanks Dong for driving this FLIP forward!
>> 
>> Introducing  `backlog status` concept for flink job makes sense to me as
>> following reasons:
>> 
>> From concept/API design perspective, it’s more general and natural than
>> above proposals as it can be used in HybridSource for bounded records, CDC
>> Source for history snapshot and general sources like KafkaSource for
>> historical messages.
>> 
>> From user cases/requirements, I’ve seen many users manually to set larger
>> checkpoint interval during backfilling and then set a shorter checkpoint
>> interval for real-time processing in their production environments as a
>> flink application optimization. Now, the flink framework can make this
>> optimization no longer require the user to set the checkpoint interval and
>> restart the job multiple times.
>> 
>> Following supporting using larger checkpoint for job under backlog status
>> in current FLIP, we can explore supporting larger parallelism/memory/cpu
>> for job under backlog status in the future.
>> 
>> In short, the updated FLIP looks good to me.
>> 
>> 
>> Best,
>> Leonard
>> 
>> 
>>> On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks again for proposing the isProcessingBacklog concept.
>>> 
>>> After discussing with Becket Qin and thinking about this more, I agree it
>>> is a better idea to add a top-level concept to all source operators to
>>> address the target use-case.
>>> 
>>> The main reason that changed my mind is that isProcessingBacklog can be
>>> described as an inherent/nature attribute of every source instance and
>> its
>>> semantics does not need to depend on any specific checkpointing policy.
>>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
>>> have considered so far (e.g. HybridSource and MySQL CDC source) without
>>> asking users to explicitly configure the per-source behavior, which
>> indeed
>>> provides better user experience.
>>> 
>>> I have updated the FLIP based on the latest suggestions. The latest FLIP
>> no
>>> longer introduces per-source config that can be used by end-users. While
>> I
>>> agree with you that CheckpointTrigger can be a useful feature to address
>>> additional use-cases, I am not sure it is necessary for the use-case
>>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
>>> in another FLIP?
>>> 
>>> Can you help take another look at the updated FLIP?
>>> 
>>> Best,
>>> Dong
>>> 
>>> 
>>> 
>>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>> 
>>>> Hi Dong,
>>>> 
>>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being
>>>>> "backpressured" at a given time (due to random traffic spikes). Then at
>>>> any
>>>>> given time, the chance of the job
>>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
>> the
>>>>> backpressure metric once a second, the estimated time for the job
>>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
>> 23163
>>>>> sec = 6.4 hours.
>>>>> 
>>>>> This means that the job will effectively always use the longer
>>>>> checkpointing interval. It looks like a real concern, right?
>>>> 
>>>> Sorry I don't understand where you are getting those numbers from.
>>>> Instead of trying to find loophole after loophole, could you try to
>> think
>>>> how a given loophole could be improved/solved?
>>>> 
>>>>> Hmm... I honestly think it will be useful to know the APIs due to the
>>>>> following reasons.
>>>> 
>>>> Please propose something. I don't think it's needed.
>>>> 
>>>>> - For the use-case mentioned in FLIP-309 motivation section, would the
>>>> APIs
>>>>> of this alternative approach be more or less usable?
>>>> 
>>>> Everything that you originally wanted to achieve in FLIP-309, you could
>> do
>>>> as well in my proposal.
>>>> Vide my many mentions of the "hacky solution".
>>>> 
>>>>> - Can these APIs reliably address the extra use-case (e.g. allow
>>>>> checkpointing interval to change dynamically even during the unbounded
>>>>> phase) as it claims?
>>>> 
>>>> I don't see why not.
>>>> 
>>>>> - Can these APIs be decoupled from the APIs currently proposed in
>>>> FLIP-309?
>>>> 
>>>> Yes
>>>> 
>>>>> For example, if the APIs of this alternative approach can be decoupled
>>>> from
>>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to
>>>>> work on this extra use-case with a more advanced/complicated design
>>>>> separately in a followup work.
>>>> 
>>>> As I voiced my concerns previously, the current design of FLIP-309 would
>>>> clog the public API and in the long run confuse the users. IMO It's
>>>> addressing the
>>>> problem in the wrong place.
>>>> 
>>>>> Hmm.. do you mean we can do the following:
>>>>> - Have all source operators emit a metric named "processingBacklog".
>>>>> - Add a job-level config that specifies "the checkpointing interval to
>> be
>>>>> used when any source is processing backlog".
>>>>> - The JM collects the "processingBacklog" periodically from all source
>>>>> operators and uses the newly added config value as appropriate.
>>>> 
>>>> Yes.
>>>> 
>>>>> The challenge with this approach is that we need to define the
>> semantics
>>>> of
>>>>> this "processingBacklog" metric and have all source operators
>>>>> implement this metric. I am not sure we are able to do this yet without
>>>>> having users explicitly provide this information on a per-source basis.
>>>>> 
>>>>> Suppose the job read from a bounded Kafka source, should it emit
>>>>> "processingBacklog=true"? If yes, then the job might use long
>>>> checkpointing
>>>>> interval even
>>>>> if the job is asked to process data starting from now to the next 1
>> hour.
>>>>> If no, then the job might use the short checkpointing interval
>>>>> even if the job is asked to re-process data starting from 7 days ago.
>>>> 
>>>> Yes. The same can be said of your proposal. Your proposal has the very
>> same
>>>> issues
>>>> that every source would have to implement it differently, most sources
>>>> would
>>>> have no idea how to properly calculate the new requested checkpoint
>>>> interval,
>>>> for those that do know how to do that, user would have to configure
>> every
>>>> source
>>>> individually and yet again we would end up with a system, that works
>> only
>>>> partially in
>>>> some special use cases (HybridSource), that's confusing the users even
>>>> more.
>>>> 
>>>> That's why I think the more generic solution, working primarily on the
>> same
>>>> metrics that are used by various auto scaling solutions (like Flink K8s
>>>> operator's
>>>> autosaler) would be better. The hacky solution I proposed to:
>>>> 1. show you that the generic solution is simply a superset of your
>> proposal
>>>> 2. if you are adamant that busyness/backpressured/records processing
>>>> rate/pending records
>>>>   metrics wouldn't cover your use case sufficiently (imo they can),
>> then
>>>> you can very easily
>>>>   enhance this algorithm with using some hints from the sources. Like
>>>> "processingBacklog==true"
>>>>   to short circuit the main algorithm, if `processingBacklog` is
>>>> available.
>>>> 
>>>> Best,
>>>> Piotrek
>>>> 
>>>> 
>>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a):
>>>> 
>>>>> Hi again Piotr,
>>>>> 
>>>>> Thank you for the reply. Please see my reply inline.
>>>>> 
>>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
>>>> piotr.nowoj...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi again Dong,
>>>>>> 
>>>>>>> I understand that JM will get the backpressure-related metrics every
>>>>> time
>>>>>>> the RestServerEndpoint receives the REST request to get these
>>>> metrics.
>>>>>> But
>>>>>>> I am not sure if RestServerEndpoint is already always receiving the
>>>>> REST
>>>>>>> metrics at regular interval (suppose there is no human manually
>>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
>>>>> interval?
>>>>>> 
>>>>>> Good catch, I've thought that metrics are pre-emptively sent to JM
>>>> every
>>>>> 10
>>>>>> seconds.
>>>>>> Indeed that's not the case at the moment, and that would have to be
>>>>>> improved.
>>>>>> 
>>>>>>> I would be surprised if Flink is already paying this much overhead
>>>> just
>>>>>> for
>>>>>>> metrics monitoring. That is the main reason I still doubt it is true.
>>>>> Can
>>>>>>> you show where this 100 ms is currently configured?
>>>>>>> 
>>>>>>> Alternatively, maybe you mean that we should add extra code to invoke
>>>>> the
>>>>>>> REST API at 100 ms interval. Then that means we need to considerably
>>>>>>> increase the network/cpu overhead at JM, where the overhead will
>>>>> increase
>>>>>>> as the number of TM/slots increase, which may pose risk to the
>>>>>> scalability
>>>>>>> of the proposed design. I am not sure we should do this. What do you
>>>>>> think?
>>>>>> 
>>>>>> Sorry. I didn't mean metric should be reported every 100ms. I meant
>>>> that
>>>>>> "backPressuredTimeMsPerSecond (metric) would report (a value of)
>>>>> 100ms/s."
>>>>>> once per metric interval (10s?).
>>>>>> 
>>>>> 
>>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being
>>>>> "backpressured" at a given time (due to random traffic spikes). Then at
>>>> any
>>>>> given time, the chance of the job
>>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
>> the
>>>>> backpressure metric once a second, the estimated time for the job
>>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
>> 23163
>>>>> sec = 6.4 hours.
>>>>> 
>>>>> This means that the job will effectively always use the longer
>>>>> checkpointing interval. It looks like a real concern, right?
>>>>> 
>>>>> 
>>>>>>> - What is the interface of this CheckpointTrigger? For example, are
>>>> we
>>>>>>> going to give CheckpointTrigger a context that it can use to fetch
>>>>>>> arbitrary metric values? This can help us understand what information
>>>>>> this
>>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
>>>> decision.
>>>>>> 
>>>>>> I honestly don't think this is important at this stage of the
>>>> discussion.
>>>>>> It could have
>>>>>> whatever interface we would deem to be best. Required things:
>>>>>> 
>>>>>> - access to at least a subset of metrics that the given
>>>>> `CheckpointTrigger`
>>>>>> requests,
>>>>>> for example via some registration mechanism, so we don't have to
>>>> fetch
>>>>>> all of the
>>>>>> metrics all the time from TMs.
>>>>>> - some way to influence `CheckpointCoordinator`. Either via manually
>>>>>> triggering
>>>>>> checkpoints, and/or ability to change the checkpointing interval.
>>>>>> 
>>>>> 
>>>>> Hmm... I honestly think it will be useful to know the APIs due to the
>>>>> following reasons.
>>>>> 
>>>>> We would need to know the concrete APIs to gauge the following:
>>>>> - For the use-case mentioned in FLIP-309 motivation section, would the
>>>> APIs
>>>>> of this alternative approach be more or less usable?
>>>>> - Can these APIs reliably address the extra use-case (e.g. allow
>>>>> checkpointing interval to change dynamically even during the unbounded
>>>>> phase) as it claims?
>>>>> - Can these APIs be decoupled from the APIs currently proposed in
>>>> FLIP-309?
>>>>> 
>>>>> For example, if the APIs of this alternative approach can be decoupled
>>>> from
>>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to
>>>>> work on this extra use-case with a more advanced/complicated design
>>>>> separately in a followup work.
>>>>> 
>>>>> 
>>>>>>> - Where is this CheckpointTrigger running? For example, is it going
>>>> to
>>>>>> run
>>>>>>> on the subtask of every source operator? Or is it going to run on the
>>>>> JM?
>>>>>> 
>>>>>> IMO on the JM.
>>>>>> 
>>>>>>> - Are we going to provide a default implementation of this
>>>>>>> CheckpointTrigger in Flink that implements the algorithm described
>>>>> below,
>>>>>>> or do we expect each source operator developer to implement their own
>>>>>>> CheckpointTrigger?
>>>>>> 
>>>>>> As I mentioned before, I think we should provide at the very least the
>>>>>> implementation
>>>>>> that replaces the current triggering mechanism (statically configured
>>>>>> checkpointing interval)
>>>>>> and it would be great to provide the backpressure monitoring trigger
>> as
>>>>>> well.
>>>>>> 
>>>>> 
>>>>> I agree that if there is a good use-case that can be addressed by the
>>>>> proposed CheckpointTrigger, then it is reasonable
>>>>> to add CheckpointTrigger and replace the current triggering mechanism
>>>> with
>>>>> it.
>>>>> 
>>>>> I also agree that we will likely find such a use-case. For example,
>>>> suppose
>>>>> the source records have event timestamps, then it is likely
>>>>> that we can use the trigger to dynamically control the checkpointing
>>>>> interval based on the difference between the watermark and current
>> system
>>>>> time.
>>>>> 
>>>>> But I am not sure the addition of this CheckpointTrigger should be
>>>> coupled
>>>>> with FLIP-309. Whether or not it is coupled probably depends on the
>>>>> concrete API design around CheckpointTrigger.
>>>>> 
>>>>> If you would be adamant that the backpressure monitoring doesn't cover
>>>> well
>>>>>> enough your use case, I would be ok to provide the hacky version that
>> I
>>>>>> also mentioned
>>>>>> before:
>>>>> 
>>>>> 
>>>>>> """
>>>>>> Especially that if my proposed algorithm wouldn't work good enough,
>>>> there
>>>>>> is
>>>>>> an obvious solution, that any source could add a metric, like let say
>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
>>>>>> could use this as an override to always switch to the
>>>>>> "slowCheckpointInterval". I don't think we need it, but that's always
>>>> an
>>>>>> option
>>>>>> that would be basically equivalent to your original proposal.
>>>>>> """
>>>>>> 
>>>>> 
>>>>> Hmm.. do you mean we can do the following:
>>>>> - Have all source operators emit a metric named "processingBacklog".
>>>>> - Add a job-level config that specifies "the checkpointing interval to
>> be
>>>>> used when any source is processing backlog".
>>>>> - The JM collects the "processingBacklog" periodically from all source
>>>>> operators and uses the newly added config value as appropriate.
>>>>> 
>>>>> The challenge with this approach is that we need to define the
>> semantics
>>>> of
>>>>> this "processingBacklog" metric and have all source operators
>>>>> implement this metric. I am not sure we are able to do this yet without
>>>>> having users explicitly provide this information on a per-source basis.
>>>>> 
>>>>> Suppose the job read from a bounded Kafka source, should it emit
>>>>> "processingBacklog=true"? If yes, then the job might use long
>>>> checkpointing
>>>>> interval even
>>>>> if the job is asked to process data starting from now to the next 1
>> hour.
>>>>> If no, then the job might use the short checkpointing interval
>>>>> even if the job is asked to re-process data starting from 7 days ago.
>>>>> 
>>>>> 
>>>>>> 
>>>>>>> - How can users specify the
>>>>>> fastCheckpointInterval/slowCheckpointInterval?
>>>>>>> For example, will we provide APIs on the CheckpointTrigger that
>>>>> end-users
>>>>>>> can use to specify the checkpointing interval? What would that look
>>>>> like?
>>>>>> 
>>>>>> Also as I mentioned before, just like metric reporters are configured:
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
>>>>>> Every CheckpointTrigger could have its own custom configuration.
>>>>>> 
>>>>>>> Overall, my gut feel is that the alternative approach based on
>>>>>>> CheckpointTrigger is more complicated
>>>>>> 
>>>>>> Yes, as usual, more generic things are more complicated, but often
>> more
>>>>>> useful in the long run.
>>>>>> 
>>>>>>> and harder to use.
>>>>>> 
>>>>>> I don't agree. Why setting in config
>>>>>> 
>>>>>> execution.checkpointing.trigger:
>>>> BackPressureMonitoringCheckpointTrigger
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
>>>>>> 1s
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
>>>>>> 30s
>>>>>> 
>>>>>> that we could even provide a shortcut to the above construct via:
>>>>>> 
>>>>>> execution.checkpointing.fast-interval: 1s
>>>>>> execution.checkpointing.slow-interval: 30s
>>>>>> 
>>>>>> is harder compared to setting two/three checkpoint intervals, one in
>>>> the
>>>>>> config/or via `env.enableCheckpointing(x)`,
>>>>>> secondly passing one/two (fast/slow) values on the source itself?
>>>>>> 
>>>>> 
>>>>> If we can address the use-case by providing just the two job-level
>> config
>>>>> as described above, I agree it will indeed be simpler.
>>>>> 
>>>>> I have tried to achieve this goal. But the caveat is that it requires
>>>> much
>>>>> more work than described above in order to give the configs
>> well-defined
>>>>> semantics. So I find it simpler to just use the approach in FLIP-309.
>>>>> 
>>>>> Let me explain my concern below. It will be great if you or someone
>> else
>>>>> can help provide a solution.
>>>>> 
>>>>> 1) We need to clearly document when the fast-interval and slow-interval
>>>>> will be used so that users can derive the expected behavior of the job
>>>> and
>>>>> be able to config these values.
>>>>> 
>>>>> 2) The trigger of fast/slow interval depends on the behavior of the
>>>> source
>>>>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source
>>>>> operator (e.g. boundedness) can describe the target behavior. For
>>>> example,
>>>>> MySQL CDC internally has two phases, namely snapshot phase and binlog
>>>>> phase, which are not explicitly exposed to its users via source
>> operator
>>>>> API. And we probably should not enumerate all internal phases of all
>>>> source
>>>>> operators that are affected by fast/slow interval.
>>>>> 
>>>>> 3) An alternative approach might be to define a new concept (e.g.
>>>>> processingBacklog) that is applied to all source operators. Then the
>>>>> fast/slow interval's documentation can depend on this concept. That
>> means
>>>>> we have to add a top-level concept (similar to source boundedness) and
>>>>> require all source operators to specify how they enforce this concept
>>>> (e.g.
>>>>> FileSystemSource always emits processingBacklog=true). And there might
>> be
>>>>> cases where the source itself (e.g. a bounded Kafka Source) can not
>>>>> automatically derive the value of this concept, in which case we need
>> to
>>>>> provide option for users to explicitly specify the value for this
>> concept
>>>>> on a per-source basis.
>>>>> 
>>>>> 
>>>>> 
>>>>>>> And it probably also has the issues of "having two places to
>>>> configure
>>>>>> checkpointing
>>>>>>> interval" and "giving flexibility for every source to implement a
>>>>>> different
>>>>>>> API" (as mentioned below).
>>>>>> 
>>>>>> No, it doesn't.
>>>>>> 
>>>>>>> IMO, it is a hard-requirement for the user-facing API to be
>>>>>>> clearly defined and users should be able to use the API without
>>>> concern
>>>>>> of
>>>>>>> regression. And this requirement is more important than the other
>>>> goals
>>>>>>> discussed above because it is related to the stability/performance of
>>>>> the
>>>>>>> production job. What do you think?
>>>>>> 
>>>>>> I don't agree with this. There are many things that work something in
>>>>>> between perfectly and well enough
>>>>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%),
>>>>> while
>>>>>> still being very useful.
>>>>>> Good examples are: selection of state backend, unaligned checkpoints,
>>>>>> buffer debloating but frankly if I go
>>>>>> through list of currently available config options, something like
>> half
>>>>> of
>>>>>> them can cause regressions. Heck,
>>>>>> even Flink itself doesn't work perfectly in 100% of the use cases, due
>>>>> to a
>>>>>> variety of design choices. Of
>>>>>> course, the more use cases are fine with said feature, the better, but
>>>> we
>>>>>> shouldn't fixate to perfectly cover
>>>>>> 100% of the cases, as that's impossible.
>>>>>> 
>>>>>> In this particular case, if back pressure monitoring  trigger can work
>>>>> well
>>>>>> enough in 95% of cases, I would
>>>>>> say that's already better than the originally proposed alternative,
>>>> which
>>>>>> doesn't work at all if user has a large
>>>>>> backlog to reprocess from Kafka, including when using HybridSource
>>>> AFTER
>>>>>> the switch to Kafka has
>>>>>> happened. For the remaining 5%, we should try to improve the behaviour
>>>>> over
>>>>>> time, but ultimately, users can
>>>>>> decide to just run a fixed checkpoint interval (or at worst use the
>>>> hacky
>>>>>> checkpoint trigger that I mentioned
>>>>>> before a couple of times).
>>>>>> 
>>>>>> Also to be pedantic, if a user naively selects slow-interval in your
>>>>>> proposal to 30 minutes, when that user's
>>>>>> job fails on average every 15-20minutes, his job can end up in a state
>>>>> that
>>>>>> it can not make any progress,
>>>>>> this arguably is quite serious regression.
>>>>>> 
>>>>> 
>>>>> I probably should not say it is "hard requirement". After all there are
>>>>> pros/cons. We will need to consider implementation complexity,
>> usability,
>>>>> extensibility etc.
>>>>> 
>>>>> I just don't think we should take it for granted to introduce
>> regression
>>>>> for one use-case in order to support another use-case. If we can not
>> find
>>>>> an algorithm/solution that addresses
>>>>> both use-case well, I hope we can be open to tackle them separately so
>>>> that
>>>>> users can choose the option that best fits their needs.
>>>>> 
>>>>> All things else being equal, I think it is preferred for user-facing
>> API
>>>> to
>>>>> be clearly defined and let users should be able to use the API without
>>>>> concern of regression.
>>>>> 
>>>>> Maybe we can list pros/cons for the alternative approaches we have been
>>>>> discussing and see choose the best approach. And maybe we will end up
>>>>> finding that use-case
>>>>> which needs CheckpointTrigger can be tackled separately from the
>> use-case
>>>>> in FLIP-309.
>>>>> 
>>>>> 
>>>>>>> I am not sure if there is a typo. Because if
>>>>> backPressuredTimeMsPerSecond
>>>>>> =
>>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
>>>> numRecordsInPerSecond /
>>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
>>>>>>> 
>>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
>>>>>> (numRecordsInPerSecond
>>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
>>>> metricsUpdateInterval"?
>>>>>> 
>>>>>> It looks like there is indeed some mistake in my proposal above. Yours
>>>>> look
>>>>>> more correct, it probably
>>>>>> still needs some safeguard/special handling if
>>>>>> `backPressuredTimeMsPerSecond > 950`
>>>>>> 
>>>>>>> The only information it can access is the backlog.
>>>>>> 
>>>>>> Again no. It can access whatever we want to provide to it.
>>>>>> 
>>>>>> Regarding the rest of your concerns. It's a matter of tweaking the
>>>>>> parameters and the algorithm itself,
>>>>>> and how much safety-net do we want to have. Ultimately, I'm pretty
>> sure
>>>>>> that's a (for 95-99% of cases)
>>>>>> solvable problem. If not, there is always the hacky solution, that
>>>> could
>>>>> be
>>>>>> even integrated into this above
>>>>>> mentioned algorithm as a short circuit to always reach
>> `slow-interval`.
>>>>>> 
>>>>>> Apart of that, you picked 3 minutes as the checkpointing interval in
>>>> your
>>>>>> counter example. In most cases
>>>>>> any interval above 1 minute would inflict pretty negligible overheads,
>>>> so
>>>>>> all in all, I would doubt there is
>>>>>> a significant benefit (in most cases) of increasing 3 minute
>> checkpoint
>>>>>> interval to anything more, let alone
>>>>>> 30 minutes.
>>>>>> 
>>>>> 
>>>>> I am not sure we should design the algorithm with the assumption that
>> the
>>>>> short checkpointing interval will always be higher than 1 minute etc.
>>>>> 
>>>>> I agree the proposed algorithm can solve most cases where the resource
>> is
>>>>> sufficient and there is always no backlog in source subtasks. On the
>>>> other
>>>>> hand, what makes SRE
>>>>> life hard is probably the remaining 1-5% cases where the traffic is
>> spiky
>>>>> and the cluster is reaching its capacity limit. The ability to predict
>>>> and
>>>>> control Flink job's behavior (including checkpointing interval) can
>>>>> considerably reduce the burden of manging Flink jobs.
>>>>> 
>>>>> Best,
>>>>> Dong
>>>>> 
>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Piotrek
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a):
>>>>>> 
>>>>>>> Hi Piotr,
>>>>>>> 
>>>>>>> Thanks for the explanations. I have some followup questions below.
>>>>>>> 
>>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi All,
>>>>>>>> 
>>>>>>>> Thanks for chipping in the discussion Ahmed!
>>>>>>>> 
>>>>>>>> Regarding using the REST API. Currently I'm leaning towards
>>>>>> implementing
>>>>>>>> this feature inside the Flink itself, via some pluggable interface.
>>>>>>>> REST API solution would be tempting, but I guess not everyone is
>>>>> using
>>>>>>>> Flink Kubernetes Operator.
>>>>>>>> 
>>>>>>>> @Dong
>>>>>>>> 
>>>>>>>>> I am not sure metrics such as isBackPressured are already sent to
>>>>> JM.
>>>>>>>> 
>>>>>>>> Fetching code path on the JM:
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
>>>>>>>> 
>>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>>>>>>>> 
>>>>>>>> Example code path accessing Task level metrics via JM using the
>>>>>>>> `MetricStore`:
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>>>>>>>> 
>>>>>>> 
>>>>>>> Thanks for the code reference. I checked the code that invoked these
>>>>> two
>>>>>>> classes and found the following information:
>>>>>>> 
>>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked
>>>> only
>>>>>>> when AggregatingJobsMetricsHandler is invoked.
>>>>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by
>>>>>>> WebMonitorEndpoint#initializeHandlers
>>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by
>>>>>> RestServerEndpoint.
>>>>>>> And RestServerEndpoint invokes these handlers in response to external
>>>>>> REST
>>>>>>> request.
>>>>>>> 
>>>>>>> I understand that JM will get the backpressure-related metrics every
>>>>> time
>>>>>>> the RestServerEndpoint receives the REST request to get these
>>>> metrics.
>>>>>> But
>>>>>>> I am not sure if RestServerEndpoint is already always receiving the
>>>>> REST
>>>>>>> metrics at regular interval (suppose there is no human manually
>>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
>>>>> interval?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>>> For example, let's say every source operator subtask reports this
>>>>>>> metric
>>>>>>>> to
>>>>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each
>>>>>>> subtask
>>>>>>>>> is backpressured roughly 10% of the total time due to traffic
>>>>> spikes
>>>>>>> (and
>>>>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 =
>>>>>>> 99.997%
>>>>>>>>> chance that there is at least one subtask that is backpressured.
>>>>> Then
>>>>>>> we
>>>>>>>>> have to wait for at least 10 seconds to check again.
>>>>>>>> 
>>>>>>>> backPressuredTimeMsPerSecond and other related metrics (like
>>>>>>>> busyTimeMsPerSecond) are not subject to that problem.
>>>>>>>> They are recalculated once every metric fetching interval, and they
>>>>>>> report
>>>>>>>> accurately on average the given subtask spent
>>>>>> busy/idling/backpressured.
>>>>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s.
>>>>>>> 
>>>>>>> 
>>>>>>> Suppose every subtask is already reporting
>>>> backPressuredTimeMsPerSecond
>>>>>> to
>>>>>>> JM once every 100 ms. If a job has 10 operators (that are not
>>>> chained)
>>>>>> and
>>>>>>> each operator has 100 subtasks, then JM would need to handle 10000
>>>>>> requests
>>>>>>> per second to receive metrics from these 1000 subtasks. It seems
>>>> like a
>>>>>>> non-trivial overhead for medium-to-large sized jobs and can make JM
>>>> the
>>>>>>> performance bottleneck during job execution.
>>>>>>> 
>>>>>>> I would be surprised if Flink is already paying this much overhead
>>>> just
>>>>>> for
>>>>>>> metrics monitoring. That is the main reason I still doubt it is true.
>>>>> Can
>>>>>>> you show where this 100 ms is currently configured?
>>>>>>> 
>>>>>>> Alternatively, maybe you mean that we should add extra code to invoke
>>>>> the
>>>>>>> REST API at 100 ms interval. Then that means we need to considerably
>>>>>>> increase the network/cpu overhead at JM, where the overhead will
>>>>> increase
>>>>>>> as the number of TM/slots increase, which may pose risk to the
>>>>>> scalability
>>>>>>> of the proposed design. I am not sure we should do this. What do you
>>>>>> think?
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>>> While it will be nice to support additional use-cases
>>>>>>>>> with one proposal, it is probably also reasonable to make
>>>>> incremental
>>>>>>>>> progress and support the low-hanging-fruit use-case first. The
>>>>> choice
>>>>>>>>> really depends on the complexity and the importance of supporting
>>>>> the
>>>>>>>> extra
>>>>>>>>> use-cases.
>>>>>>>> 
>>>>>>>> That would be true, if that was a private implementation detail or
>>>> if
>>>>>> the
>>>>>>>> low-hanging-fruit-solution would be on the direct path to the final
>>>>>>>> solution.
>>>>>>>> That's unfortunately not the case here. This will add public facing
>>>>>> API,
>>>>>>>> that we will later need to maintain, no matter what the final
>>>>> solution
>>>>>>> will
>>>>>>>> be,
>>>>>>>> and at the moment at least I don't see it being related to a
>>>>> "perfect"
>>>>>>>> solution.
>>>>>>> 
>>>>>>> 
>>>>>>> Sure. Then let's decide the final solution first.
>>>>>>> 
>>>>>>> 
>>>>>>>>> I guess the point is that the suggested approach, which
>>>> dynamically
>>>>>>>>> determines the checkpointing interval based on the backpressure,
>>>>> may
>>>>>>>> cause
>>>>>>>>> regression when the checkpointing interval is relatively low.
>>>> This
>>>>>>> makes
>>>>>>>> it
>>>>>>>>> hard for users to enable this feature in production. It is like
>>>> an
>>>>>>>>> auto-driving system that is not guaranteed to work
>>>>>>>> 
>>>>>>>> Yes, creating a more generic solution that would require less
>>>>>>> configuration
>>>>>>>> is usually more difficult then static configurations.
>>>>>>>> It doesn't mean we shouldn't try to do that. Especially that if my
>>>>>>> proposed
>>>>>>>> algorithm wouldn't work good enough, there is
>>>>>>>> an obvious solution, that any source could add a metric, like let
>>>> say
>>>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
>>>>>>>> could use this as an override to always switch to the
>>>>>>>> "slowCheckpointInterval". I don't think we need it, but that's
>>>> always
>>>>>> an
>>>>>>>> option
>>>>>>>> that would be basically equivalent to your original proposal. Or
>>>> even
>>>>>>>> source could add "suggestedCheckpointInterval : int", and
>>>>>>>> `CheckpointTrigger` could use that value if present as a hint in
>>>> one
>>>>>> way
>>>>>>> or
>>>>>>>> another.
>>>>>>>> 
>>>>>>> 
>>>>>>> So far we have talked about the possibility of using
>>>> CheckpointTrigger
>>>>>> and
>>>>>>> mentioned the CheckpointTrigger
>>>>>>> and read metric values.
>>>>>>> 
>>>>>>> Can you help answer the following questions so that I can understand
>>>>> the
>>>>>>> alternative solution more concretely:
>>>>>>> 
>>>>>>> - What is the interface of this CheckpointTrigger? For example, are
>>>> we
>>>>>>> going to give CheckpointTrigger a context that it can use to fetch
>>>>>>> arbitrary metric values? This can help us understand what information
>>>>>> this
>>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
>>>> decision.
>>>>>>> - Where is this CheckpointTrigger running? For example, is it going
>>>> to
>>>>>> run
>>>>>>> on the subtask of every source operator? Or is it going to run on the
>>>>> JM?
>>>>>>> - Are we going to provide a default implementation of this
>>>>>>> CheckpointTrigger in Flink that implements the algorithm described
>>>>> below,
>>>>>>> or do we expect each source operator developer to implement their own
>>>>>>> CheckpointTrigger?
>>>>>>> - How can users specify the
>>>>>> fastCheckpointInterval/slowCheckpointInterval?
>>>>>>> For example, will we provide APIs on the CheckpointTrigger that
>>>>> end-users
>>>>>>> can use to specify the checkpointing interval? What would that look
>>>>> like?
>>>>>>> 
>>>>>>> Overall, my gut feel is that the alternative approach based on
>>>>>>> CheckpointTrigger is more complicated and harder to use. And it
>>>>> probably
>>>>>>> also has the issues of "having two places to configure checkpointing
>>>>>>> interval" and "giving flexibility for every source to implement a
>>>>>> different
>>>>>>> API" (as mentioned below).
>>>>>>> 
>>>>>>> Maybe we can evaluate it more after knowing the answers to the above
>>>>>>> questions.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>>> On the other hand, the approach currently proposed in the FLIP is
>>>>>> much
>>>>>>>>> simpler as it does not depend on backpressure. Users specify the
>>>>>> extra
>>>>>>>>> interval requirement on the specific sources (e.g. HybridSource,
>>>>>> MySQL
>>>>>>>> CDC
>>>>>>>>> Source) and can easily know the checkpointing interval will be
>>>> used
>>>>>> on
>>>>>>>> the
>>>>>>>>> continuous phase of the corresponding source. This is pretty much
>>>>>> same
>>>>>>> as
>>>>>>>>> how users use the existing execution.checkpointing.interval
>>>> config.
>>>>>> So
>>>>>>>>> there is no extra concern of regression caused by this approach.
>>>>>>>> 
>>>>>>>> To an extent, but as I have already previously mentioned I really
>>>>>> really
>>>>>>> do
>>>>>>>> not like idea of:
>>>>>>>> - having two places to configure checkpointing interval (config
>>>>> file
>>>>>>> and
>>>>>>>> in the Source builders)
>>>>>>>> - giving flexibility for every source to implement a different
>>>> API
>>>>>> for
>>>>>>>> that purpose
>>>>>>>> - creating a solution that is not generic enough, so that we will
>>>>>> need
>>>>>>> a
>>>>>>>> completely different mechanism in the future anyway
>>>>>>>> 
>>>>>>> 
>>>>>>> Yeah, I understand different developers might have different
>>>>>>> concerns/tastes for these APIs. Ultimately, there might not be a
>>>>> perfect
>>>>>>> solution and we have to choose based on the pros/cons of these
>>>>> solutions.
>>>>>>> 
>>>>>>> I agree with you that, all things being equal, it is preferable to 1)
>>>>>> have
>>>>>>> one place to configure checkpointing intervals, 2) have all source
>>>>>>> operators use the same API, and 3) create a solution that is generic
>>>>> and
>>>>>>> last lasting. Note that these three goals affects the usability and
>>>>>>> extensibility of the API, but not necessarily the
>>>> stability/performance
>>>>>> of
>>>>>>> the production job.
>>>>>>> 
>>>>>>> BTW, there are also other preferrable goals. For example, it is very
>>>>>> useful
>>>>>>> for the job's behavior to be predictable and interpretable so that
>>>> SRE
>>>>>> can
>>>>>>> operator/debug the Flink in an easier way. We can list these
>>>> pros/cons
>>>>>>> altogether later.
>>>>>>> 
>>>>>>> I am wondering if we can first agree on the priority of goals we want
>>>>> to
>>>>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be
>>>>>>> clearly defined and users should be able to use the API without
>>>> concern
>>>>>> of
>>>>>>> regression. And this requirement is more important than the other
>>>> goals
>>>>>>> discussed above because it is related to the stability/performance of
>>>>> the
>>>>>>> production job. What do you think?
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>>> Sounds good. Looking forward to learning more ideas.
>>>>>>>> 
>>>>>>>> I have thought about this a bit more, and I think we don't need to
>>>>>> check
>>>>>>>> for the backpressure status, or how much overloaded all of the
>>>>>> operators
>>>>>>>> are.
>>>>>>>> We could just check three things for source operators:
>>>>>>>> 1. pendingRecords (backlog length)
>>>>>>>> 2. numRecordsInPerSecond
>>>>>>>> 3. backPressuredTimeMsPerSecond
>>>>>>>> 
>>>>>>>> // int metricsUpdateInterval = 10s // obtained from config
>>>>>>>> // Next line calculates how many records can we consume from the
>>>>>> backlog,
>>>>>>>> assuming
>>>>>>>> // that magically the reason behind a backpressure vanishes. We
>>>> will
>>>>>> use
>>>>>>>> this only as
>>>>>>>> // a safeguard  against scenarios like for example if backpressure
>>>>> was
>>>>>>>> caused by some
>>>>>>>> // intermittent failure/performance degradation.
>>>>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond /
>>>>> (1000
>>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval
>>>>>>>> 
>>>>>>> 
>>>>>>> I am not sure if there is a typo. Because if
>>>>>> backPressuredTimeMsPerSecond =
>>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
>>>> numRecordsInPerSecond /
>>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
>>>>>>> 
>>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
>>>>>> (numRecordsInPerSecond
>>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
>>>> metricsUpdateInterval"?
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the
>>>>>>> backlog
>>>>>>>> as
>>>>>>>> // a safeguard against an intermittent back pressure problems, so
>>>>> that
>>>>>> we
>>>>>>>> don't
>>>>>>>> // calculate next checkpoint interval far far in the future, while
>>>>> the
>>>>>>>> backpressure
>>>>>>>> // goes away before we will recalculate metrics and new
>>>> checkpointing
>>>>>>>> interval
>>>>>>>> timeToConsumeBacklog = (pendingRecords -
>>>>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Then we can use those numbers to calculate desired checkpointed
>>>>>> interval
>>>>>>>> for example like this:
>>>>>>>> 
>>>>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10;
>>>> //this
>>>>>> may
>>>>>>>> need some refining
>>>>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval,
>>>>>>>> calculatedCheckpointInterval), slowCheckpointInterval);
>>>>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval;
>>>>>>>> 
>>>>>>>> WDYT?
>>>>>>> 
>>>>>>> 
>>>>>>> I think the idea of the above algorithm is to incline to use the
>>>>>>> fastCheckpointInterval unless we are very sure the backlog will take
>>>> a
>>>>>> long
>>>>>>> time to process. This can alleviate the concern of regression during
>>>>> the
>>>>>>> continuous_bounded phase since we are more likely to use the
>>>>>>> fastCheckpointInterval. However, it can cause regression during the
>>>>>> bounded
>>>>>>> phase.
>>>>>>> 
>>>>>>> I will use a concrete example to explain the risk of regression:
>>>>>>> - The user is using HybridSource to read from HDFS followed by Kafka.
>>>>> The
>>>>>>> data in HDFS is old and there is no need for data freshness for the
>>>>> data
>>>>>> in
>>>>>>> HDFS.
>>>>>>> - The user configures the job as below:
>>>>>>> - fastCheckpointInterval = 3 minutes
>>>>>>> - slowCheckpointInterval = 30 minutes
>>>>>>> - metricsUpdateInterval = 100 ms
>>>>>>> 
>>>>>>> Using the above formulate, we can know that once pendingRecords
>>>>>>> <= numRecordsInPerSecond * 30-minutes, then
>>>>> calculatedCheckpointInterval
>>>>>> <=
>>>>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the
>>>>>>> checkpointing interval. Then in the last 30 minutes of the bounded
>>>>> phase,
>>>>>>> the checkpointing frequency will be 10X higher than what the user
>>>>> wants.
>>>>>>> 
>>>>>>> Also note that the same issue would also considerably limit the
>>>>> benefits
>>>>>> of
>>>>>>> the algorithm. For example, during the continuous phase, the
>>>> algorithm
>>>>>> will
>>>>>>> only be better than the approach in FLIP-309 when there is at least
>>>>>>> 30-minutes worth of backlog in the source.
>>>>>>> 
>>>>>>> Sure, having a slower checkpointing interval in this extreme case
>>>>> (where
>>>>>>> there is 30-minutes backlog in the continous-unbounded phase) is
>>>> still
>>>>>>> useful when this happens. But since this is the un-common case, and
>>>> the
>>>>>>> right solution is probably to do capacity planning to avoid this from
>>>>>>> happening in the first place, I am not sure it is worth optimizing
>>>> for
>>>>>> this
>>>>>>> case at the cost of regression in the bounded phase and the reduced
>>>>>>> operational predictability for users (e.g. what checkpointing
>>>> interval
>>>>>>> should I expect at this stage of the job).
>>>>>>> 
>>>>>>> I think the fundamental issue with this algorithm is that it is
>>>> applied
>>>>>> to
>>>>>>> both the bounded phases and the continous_unbounded phases without
>>>>>> knowing
>>>>>>> which phase the job is running at. The only information it can access
>>>>> is
>>>>>>> the backlog. But two sources with the same amount of backlog do not
>>>>>>> necessarily mean they have the same data freshness requirement.
>>>>>>> 
>>>>>>> In this particular example, users know that the data in HDFS is very
>>>>> old
>>>>>>> and there is no need for data freshness. Users can express signals
>>>> via
>>>>>> the
>>>>>>> per-source API proposed in the FLIP. This is why the current approach
>>>>> in
>>>>>>> FLIP-309 can be better in this case.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Best,
>>>>>>> Dong
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Piotrek
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Reply via email to