Looks good to me! Thanks Dong, Yunfeng and all for your discussion and design.
Best, Jingsong On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com> wrote: > > Thank you Dong for driving this FLIP. > > The new design looks good to me! > > Best, > Jark > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道: > > > > Thank you Leonard for the review! > > > > Hi Piotr, do you have any comments on the latest proposal? > > > > I am wondering if it is OK to start the voting thread this week. > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote: > > > >> Thanks Dong for driving this FLIP forward! > >> > >> Introducing `backlog status` concept for flink job makes sense to me as > >> following reasons: > >> > >> From concept/API design perspective, it’s more general and natural than > >> above proposals as it can be used in HybridSource for bounded records, CDC > >> Source for history snapshot and general sources like KafkaSource for > >> historical messages. > >> > >> From user cases/requirements, I’ve seen many users manually to set larger > >> checkpoint interval during backfilling and then set a shorter checkpoint > >> interval for real-time processing in their production environments as a > >> flink application optimization. Now, the flink framework can make this > >> optimization no longer require the user to set the checkpoint interval and > >> restart the job multiple times. > >> > >> Following supporting using larger checkpoint for job under backlog status > >> in current FLIP, we can explore supporting larger parallelism/memory/cpu > >> for job under backlog status in the future. > >> > >> In short, the updated FLIP looks good to me. > >> > >> > >> Best, > >> Leonard > >> > >> > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote: > >>> > >>> Hi Piotr, > >>> > >>> Thanks again for proposing the isProcessingBacklog concept. > >>> > >>> After discussing with Becket Qin and thinking about this more, I agree it > >>> is a better idea to add a top-level concept to all source operators to > >>> address the target use-case. > >>> > >>> The main reason that changed my mind is that isProcessingBacklog can be > >>> described as an inherent/nature attribute of every source instance and > >> its > >>> semantics does not need to depend on any specific checkpointing policy. > >>> Also, we can hardcode the isProcessingBacklog behavior for the sources we > >>> have considered so far (e.g. HybridSource and MySQL CDC source) without > >>> asking users to explicitly configure the per-source behavior, which > >> indeed > >>> provides better user experience. > >>> > >>> I have updated the FLIP based on the latest suggestions. The latest FLIP > >> no > >>> longer introduces per-source config that can be used by end-users. While > >> I > >>> agree with you that CheckpointTrigger can be a useful feature to address > >>> additional use-cases, I am not sure it is necessary for the use-case > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately > >>> in another FLIP? > >>> > >>> Can you help take another look at the updated FLIP? > >>> > >>> Best, > >>> Dong > >>> > >>> > >>> > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org> > >>> wrote: > >>> > >>>> Hi Dong, > >>>> > >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being > >>>>> "backpressured" at a given time (due to random traffic spikes). Then at > >>>> any > >>>>> given time, the chance of the job > >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate > >> the > >>>>> backpressure metric once a second, the estimated time for the job > >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = > >> 23163 > >>>>> sec = 6.4 hours. > >>>>> > >>>>> This means that the job will effectively always use the longer > >>>>> checkpointing interval. It looks like a real concern, right? > >>>> > >>>> Sorry I don't understand where you are getting those numbers from. > >>>> Instead of trying to find loophole after loophole, could you try to > >> think > >>>> how a given loophole could be improved/solved? > >>>> > >>>>> Hmm... I honestly think it will be useful to know the APIs due to the > >>>>> following reasons. > >>>> > >>>> Please propose something. I don't think it's needed. > >>>> > >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the > >>>> APIs > >>>>> of this alternative approach be more or less usable? > >>>> > >>>> Everything that you originally wanted to achieve in FLIP-309, you could > >> do > >>>> as well in my proposal. > >>>> Vide my many mentions of the "hacky solution". > >>>> > >>>>> - Can these APIs reliably address the extra use-case (e.g. allow > >>>>> checkpointing interval to change dynamically even during the unbounded > >>>>> phase) as it claims? > >>>> > >>>> I don't see why not. > >>>> > >>>>> - Can these APIs be decoupled from the APIs currently proposed in > >>>> FLIP-309? > >>>> > >>>> Yes > >>>> > >>>>> For example, if the APIs of this alternative approach can be decoupled > >>>> from > >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to > >>>>> work on this extra use-case with a more advanced/complicated design > >>>>> separately in a followup work. > >>>> > >>>> As I voiced my concerns previously, the current design of FLIP-309 would > >>>> clog the public API and in the long run confuse the users. IMO It's > >>>> addressing the > >>>> problem in the wrong place. > >>>> > >>>>> Hmm.. do you mean we can do the following: > >>>>> - Have all source operators emit a metric named "processingBacklog". > >>>>> - Add a job-level config that specifies "the checkpointing interval to > >> be > >>>>> used when any source is processing backlog". > >>>>> - The JM collects the "processingBacklog" periodically from all source > >>>>> operators and uses the newly added config value as appropriate. > >>>> > >>>> Yes. > >>>> > >>>>> The challenge with this approach is that we need to define the > >> semantics > >>>> of > >>>>> this "processingBacklog" metric and have all source operators > >>>>> implement this metric. I am not sure we are able to do this yet without > >>>>> having users explicitly provide this information on a per-source basis. > >>>>> > >>>>> Suppose the job read from a bounded Kafka source, should it emit > >>>>> "processingBacklog=true"? If yes, then the job might use long > >>>> checkpointing > >>>>> interval even > >>>>> if the job is asked to process data starting from now to the next 1 > >> hour. > >>>>> If no, then the job might use the short checkpointing interval > >>>>> even if the job is asked to re-process data starting from 7 days ago. > >>>> > >>>> Yes. The same can be said of your proposal. Your proposal has the very > >> same > >>>> issues > >>>> that every source would have to implement it differently, most sources > >>>> would > >>>> have no idea how to properly calculate the new requested checkpoint > >>>> interval, > >>>> for those that do know how to do that, user would have to configure > >> every > >>>> source > >>>> individually and yet again we would end up with a system, that works > >> only > >>>> partially in > >>>> some special use cases (HybridSource), that's confusing the users even > >>>> more. > >>>> > >>>> That's why I think the more generic solution, working primarily on the > >> same > >>>> metrics that are used by various auto scaling solutions (like Flink K8s > >>>> operator's > >>>> autosaler) would be better. The hacky solution I proposed to: > >>>> 1. show you that the generic solution is simply a superset of your > >> proposal > >>>> 2. if you are adamant that busyness/backpressured/records processing > >>>> rate/pending records > >>>> metrics wouldn't cover your use case sufficiently (imo they can), > >> then > >>>> you can very easily > >>>> enhance this algorithm with using some hints from the sources. Like > >>>> "processingBacklog==true" > >>>> to short circuit the main algorithm, if `processingBacklog` is > >>>> available. > >>>> > >>>> Best, > >>>> Piotrek > >>>> > >>>> > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a): > >>>> > >>>>> Hi again Piotr, > >>>>> > >>>>> Thank you for the reply. Please see my reply inline. > >>>>> > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski < > >>>> piotr.nowoj...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi again Dong, > >>>>>> > >>>>>>> I understand that JM will get the backpressure-related metrics every > >>>>> time > >>>>>>> the RestServerEndpoint receives the REST request to get these > >>>> metrics. > >>>>>> But > >>>>>>> I am not sure if RestServerEndpoint is already always receiving the > >>>>> REST > >>>>>>> metrics at regular interval (suppose there is no human manually > >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the > >>>>> interval? > >>>>>> > >>>>>> Good catch, I've thought that metrics are pre-emptively sent to JM > >>>> every > >>>>> 10 > >>>>>> seconds. > >>>>>> Indeed that's not the case at the moment, and that would have to be > >>>>>> improved. > >>>>>> > >>>>>>> I would be surprised if Flink is already paying this much overhead > >>>> just > >>>>>> for > >>>>>>> metrics monitoring. That is the main reason I still doubt it is true. > >>>>> Can > >>>>>>> you show where this 100 ms is currently configured? > >>>>>>> > >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke > >>>>> the > >>>>>>> REST API at 100 ms interval. Then that means we need to considerably > >>>>>>> increase the network/cpu overhead at JM, where the overhead will > >>>>> increase > >>>>>>> as the number of TM/slots increase, which may pose risk to the > >>>>>> scalability > >>>>>>> of the proposed design. I am not sure we should do this. What do you > >>>>>> think? > >>>>>> > >>>>>> Sorry. I didn't mean metric should be reported every 100ms. I meant > >>>> that > >>>>>> "backPressuredTimeMsPerSecond (metric) would report (a value of) > >>>>> 100ms/s." > >>>>>> once per metric interval (10s?). > >>>>>> > >>>>> > >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being > >>>>> "backpressured" at a given time (due to random traffic spikes). Then at > >>>> any > >>>>> given time, the chance of the job > >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate > >> the > >>>>> backpressure metric once a second, the estimated time for the job > >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = > >> 23163 > >>>>> sec = 6.4 hours. > >>>>> > >>>>> This means that the job will effectively always use the longer > >>>>> checkpointing interval. It looks like a real concern, right? > >>>>> > >>>>> > >>>>>>> - What is the interface of this CheckpointTrigger? For example, are > >>>> we > >>>>>>> going to give CheckpointTrigger a context that it can use to fetch > >>>>>>> arbitrary metric values? This can help us understand what information > >>>>>> this > >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint > >>>> decision. > >>>>>> > >>>>>> I honestly don't think this is important at this stage of the > >>>> discussion. > >>>>>> It could have > >>>>>> whatever interface we would deem to be best. Required things: > >>>>>> > >>>>>> - access to at least a subset of metrics that the given > >>>>> `CheckpointTrigger` > >>>>>> requests, > >>>>>> for example via some registration mechanism, so we don't have to > >>>> fetch > >>>>>> all of the > >>>>>> metrics all the time from TMs. > >>>>>> - some way to influence `CheckpointCoordinator`. Either via manually > >>>>>> triggering > >>>>>> checkpoints, and/or ability to change the checkpointing interval. > >>>>>> > >>>>> > >>>>> Hmm... I honestly think it will be useful to know the APIs due to the > >>>>> following reasons. > >>>>> > >>>>> We would need to know the concrete APIs to gauge the following: > >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the > >>>> APIs > >>>>> of this alternative approach be more or less usable? > >>>>> - Can these APIs reliably address the extra use-case (e.g. allow > >>>>> checkpointing interval to change dynamically even during the unbounded > >>>>> phase) as it claims? > >>>>> - Can these APIs be decoupled from the APIs currently proposed in > >>>> FLIP-309? > >>>>> > >>>>> For example, if the APIs of this alternative approach can be decoupled > >>>> from > >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to > >>>>> work on this extra use-case with a more advanced/complicated design > >>>>> separately in a followup work. > >>>>> > >>>>> > >>>>>>> - Where is this CheckpointTrigger running? For example, is it going > >>>> to > >>>>>> run > >>>>>>> on the subtask of every source operator? Or is it going to run on the > >>>>> JM? > >>>>>> > >>>>>> IMO on the JM. > >>>>>> > >>>>>>> - Are we going to provide a default implementation of this > >>>>>>> CheckpointTrigger in Flink that implements the algorithm described > >>>>> below, > >>>>>>> or do we expect each source operator developer to implement their own > >>>>>>> CheckpointTrigger? > >>>>>> > >>>>>> As I mentioned before, I think we should provide at the very least the > >>>>>> implementation > >>>>>> that replaces the current triggering mechanism (statically configured > >>>>>> checkpointing interval) > >>>>>> and it would be great to provide the backpressure monitoring trigger > >> as > >>>>>> well. > >>>>>> > >>>>> > >>>>> I agree that if there is a good use-case that can be addressed by the > >>>>> proposed CheckpointTrigger, then it is reasonable > >>>>> to add CheckpointTrigger and replace the current triggering mechanism > >>>> with > >>>>> it. > >>>>> > >>>>> I also agree that we will likely find such a use-case. For example, > >>>> suppose > >>>>> the source records have event timestamps, then it is likely > >>>>> that we can use the trigger to dynamically control the checkpointing > >>>>> interval based on the difference between the watermark and current > >> system > >>>>> time. > >>>>> > >>>>> But I am not sure the addition of this CheckpointTrigger should be > >>>> coupled > >>>>> with FLIP-309. Whether or not it is coupled probably depends on the > >>>>> concrete API design around CheckpointTrigger. > >>>>> > >>>>> If you would be adamant that the backpressure monitoring doesn't cover > >>>> well > >>>>>> enough your use case, I would be ok to provide the hacky version that > >> I > >>>>>> also mentioned > >>>>>> before: > >>>>> > >>>>> > >>>>>> """ > >>>>>> Especially that if my proposed algorithm wouldn't work good enough, > >>>> there > >>>>>> is > >>>>>> an obvious solution, that any source could add a metric, like let say > >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger` > >>>>>> could use this as an override to always switch to the > >>>>>> "slowCheckpointInterval". I don't think we need it, but that's always > >>>> an > >>>>>> option > >>>>>> that would be basically equivalent to your original proposal. > >>>>>> """ > >>>>>> > >>>>> > >>>>> Hmm.. do you mean we can do the following: > >>>>> - Have all source operators emit a metric named "processingBacklog". > >>>>> - Add a job-level config that specifies "the checkpointing interval to > >> be > >>>>> used when any source is processing backlog". > >>>>> - The JM collects the "processingBacklog" periodically from all source > >>>>> operators and uses the newly added config value as appropriate. > >>>>> > >>>>> The challenge with this approach is that we need to define the > >> semantics > >>>> of > >>>>> this "processingBacklog" metric and have all source operators > >>>>> implement this metric. I am not sure we are able to do this yet without > >>>>> having users explicitly provide this information on a per-source basis. > >>>>> > >>>>> Suppose the job read from a bounded Kafka source, should it emit > >>>>> "processingBacklog=true"? If yes, then the job might use long > >>>> checkpointing > >>>>> interval even > >>>>> if the job is asked to process data starting from now to the next 1 > >> hour. > >>>>> If no, then the job might use the short checkpointing interval > >>>>> even if the job is asked to re-process data starting from 7 days ago. > >>>>> > >>>>> > >>>>>> > >>>>>>> - How can users specify the > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > >>>>>>> For example, will we provide APIs on the CheckpointTrigger that > >>>>> end-users > >>>>>>> can use to specify the checkpointing interval? What would that look > >>>>> like? > >>>>>> > >>>>>> Also as I mentioned before, just like metric reporters are configured: > >>>>>> > >>>>>> > >>>>> > >>>> > >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ > >>>>>> Every CheckpointTrigger could have its own custom configuration. > >>>>>> > >>>>>>> Overall, my gut feel is that the alternative approach based on > >>>>>>> CheckpointTrigger is more complicated > >>>>>> > >>>>>> Yes, as usual, more generic things are more complicated, but often > >> more > >>>>>> useful in the long run. > >>>>>> > >>>>>>> and harder to use. > >>>>>> > >>>>>> I don't agree. Why setting in config > >>>>>> > >>>>>> execution.checkpointing.trigger: > >>>> BackPressureMonitoringCheckpointTrigger > >>>>>> > >>>>>> > >>>>> > >>>> > >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: > >>>>>> 1s > >>>>>> > >>>>>> > >>>>> > >>>> > >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: > >>>>>> 30s > >>>>>> > >>>>>> that we could even provide a shortcut to the above construct via: > >>>>>> > >>>>>> execution.checkpointing.fast-interval: 1s > >>>>>> execution.checkpointing.slow-interval: 30s > >>>>>> > >>>>>> is harder compared to setting two/three checkpoint intervals, one in > >>>> the > >>>>>> config/or via `env.enableCheckpointing(x)`, > >>>>>> secondly passing one/two (fast/slow) values on the source itself? > >>>>>> > >>>>> > >>>>> If we can address the use-case by providing just the two job-level > >> config > >>>>> as described above, I agree it will indeed be simpler. > >>>>> > >>>>> I have tried to achieve this goal. But the caveat is that it requires > >>>> much > >>>>> more work than described above in order to give the configs > >> well-defined > >>>>> semantics. So I find it simpler to just use the approach in FLIP-309. > >>>>> > >>>>> Let me explain my concern below. It will be great if you or someone > >> else > >>>>> can help provide a solution. > >>>>> > >>>>> 1) We need to clearly document when the fast-interval and slow-interval > >>>>> will be used so that users can derive the expected behavior of the job > >>>> and > >>>>> be able to config these values. > >>>>> > >>>>> 2) The trigger of fast/slow interval depends on the behavior of the > >>>> source > >>>>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source > >>>>> operator (e.g. boundedness) can describe the target behavior. For > >>>> example, > >>>>> MySQL CDC internally has two phases, namely snapshot phase and binlog > >>>>> phase, which are not explicitly exposed to its users via source > >> operator > >>>>> API. And we probably should not enumerate all internal phases of all > >>>> source > >>>>> operators that are affected by fast/slow interval. > >>>>> > >>>>> 3) An alternative approach might be to define a new concept (e.g. > >>>>> processingBacklog) that is applied to all source operators. Then the > >>>>> fast/slow interval's documentation can depend on this concept. That > >> means > >>>>> we have to add a top-level concept (similar to source boundedness) and > >>>>> require all source operators to specify how they enforce this concept > >>>> (e.g. > >>>>> FileSystemSource always emits processingBacklog=true). And there might > >> be > >>>>> cases where the source itself (e.g. a bounded Kafka Source) can not > >>>>> automatically derive the value of this concept, in which case we need > >> to > >>>>> provide option for users to explicitly specify the value for this > >> concept > >>>>> on a per-source basis. > >>>>> > >>>>> > >>>>> > >>>>>>> And it probably also has the issues of "having two places to > >>>> configure > >>>>>> checkpointing > >>>>>>> interval" and "giving flexibility for every source to implement a > >>>>>> different > >>>>>>> API" (as mentioned below). > >>>>>> > >>>>>> No, it doesn't. > >>>>>> > >>>>>>> IMO, it is a hard-requirement for the user-facing API to be > >>>>>>> clearly defined and users should be able to use the API without > >>>> concern > >>>>>> of > >>>>>>> regression. And this requirement is more important than the other > >>>> goals > >>>>>>> discussed above because it is related to the stability/performance of > >>>>> the > >>>>>>> production job. What do you think? > >>>>>> > >>>>>> I don't agree with this. There are many things that work something in > >>>>>> between perfectly and well enough > >>>>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%), > >>>>> while > >>>>>> still being very useful. > >>>>>> Good examples are: selection of state backend, unaligned checkpoints, > >>>>>> buffer debloating but frankly if I go > >>>>>> through list of currently available config options, something like > >> half > >>>>> of > >>>>>> them can cause regressions. Heck, > >>>>>> even Flink itself doesn't work perfectly in 100% of the use cases, due > >>>>> to a > >>>>>> variety of design choices. Of > >>>>>> course, the more use cases are fine with said feature, the better, but > >>>> we > >>>>>> shouldn't fixate to perfectly cover > >>>>>> 100% of the cases, as that's impossible. > >>>>>> > >>>>>> In this particular case, if back pressure monitoring trigger can work > >>>>> well > >>>>>> enough in 95% of cases, I would > >>>>>> say that's already better than the originally proposed alternative, > >>>> which > >>>>>> doesn't work at all if user has a large > >>>>>> backlog to reprocess from Kafka, including when using HybridSource > >>>> AFTER > >>>>>> the switch to Kafka has > >>>>>> happened. For the remaining 5%, we should try to improve the behaviour > >>>>> over > >>>>>> time, but ultimately, users can > >>>>>> decide to just run a fixed checkpoint interval (or at worst use the > >>>> hacky > >>>>>> checkpoint trigger that I mentioned > >>>>>> before a couple of times). > >>>>>> > >>>>>> Also to be pedantic, if a user naively selects slow-interval in your > >>>>>> proposal to 30 minutes, when that user's > >>>>>> job fails on average every 15-20minutes, his job can end up in a state > >>>>> that > >>>>>> it can not make any progress, > >>>>>> this arguably is quite serious regression. > >>>>>> > >>>>> > >>>>> I probably should not say it is "hard requirement". After all there are > >>>>> pros/cons. We will need to consider implementation complexity, > >> usability, > >>>>> extensibility etc. > >>>>> > >>>>> I just don't think we should take it for granted to introduce > >> regression > >>>>> for one use-case in order to support another use-case. If we can not > >> find > >>>>> an algorithm/solution that addresses > >>>>> both use-case well, I hope we can be open to tackle them separately so > >>>> that > >>>>> users can choose the option that best fits their needs. > >>>>> > >>>>> All things else being equal, I think it is preferred for user-facing > >> API > >>>> to > >>>>> be clearly defined and let users should be able to use the API without > >>>>> concern of regression. > >>>>> > >>>>> Maybe we can list pros/cons for the alternative approaches we have been > >>>>> discussing and see choose the best approach. And maybe we will end up > >>>>> finding that use-case > >>>>> which needs CheckpointTrigger can be tackled separately from the > >> use-case > >>>>> in FLIP-309. > >>>>> > >>>>> > >>>>>>> I am not sure if there is a typo. Because if > >>>>> backPressuredTimeMsPerSecond > >>>>>> = > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > >>>> numRecordsInPerSecond / > >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm. > >>>>>>> > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > >>>>>> (numRecordsInPerSecond > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > >>>> metricsUpdateInterval"? > >>>>>> > >>>>>> It looks like there is indeed some mistake in my proposal above. Yours > >>>>> look > >>>>>> more correct, it probably > >>>>>> still needs some safeguard/special handling if > >>>>>> `backPressuredTimeMsPerSecond > 950` > >>>>>> > >>>>>>> The only information it can access is the backlog. > >>>>>> > >>>>>> Again no. It can access whatever we want to provide to it. > >>>>>> > >>>>>> Regarding the rest of your concerns. It's a matter of tweaking the > >>>>>> parameters and the algorithm itself, > >>>>>> and how much safety-net do we want to have. Ultimately, I'm pretty > >> sure > >>>>>> that's a (for 95-99% of cases) > >>>>>> solvable problem. If not, there is always the hacky solution, that > >>>> could > >>>>> be > >>>>>> even integrated into this above > >>>>>> mentioned algorithm as a short circuit to always reach > >> `slow-interval`. > >>>>>> > >>>>>> Apart of that, you picked 3 minutes as the checkpointing interval in > >>>> your > >>>>>> counter example. In most cases > >>>>>> any interval above 1 minute would inflict pretty negligible overheads, > >>>> so > >>>>>> all in all, I would doubt there is > >>>>>> a significant benefit (in most cases) of increasing 3 minute > >> checkpoint > >>>>>> interval to anything more, let alone > >>>>>> 30 minutes. > >>>>>> > >>>>> > >>>>> I am not sure we should design the algorithm with the assumption that > >> the > >>>>> short checkpointing interval will always be higher than 1 minute etc. > >>>>> > >>>>> I agree the proposed algorithm can solve most cases where the resource > >> is > >>>>> sufficient and there is always no backlog in source subtasks. On the > >>>> other > >>>>> hand, what makes SRE > >>>>> life hard is probably the remaining 1-5% cases where the traffic is > >> spiky > >>>>> and the cluster is reaching its capacity limit. The ability to predict > >>>> and > >>>>> control Flink job's behavior (including checkpointing interval) can > >>>>> considerably reduce the burden of manging Flink jobs. > >>>>> > >>>>> Best, > >>>>> Dong > >>>>> > >>>>> > >>>>>> > >>>>>> Best, > >>>>>> Piotrek > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a): > >>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> Thanks for the explanations. I have some followup questions below. > >>>>>>> > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi All, > >>>>>>>> > >>>>>>>> Thanks for chipping in the discussion Ahmed! > >>>>>>>> > >>>>>>>> Regarding using the REST API. Currently I'm leaning towards > >>>>>> implementing > >>>>>>>> this feature inside the Flink itself, via some pluggable interface. > >>>>>>>> REST API solution would be tempting, but I guess not everyone is > >>>>> using > >>>>>>>> Flink Kubernetes Operator. > >>>>>>>> > >>>>>>>> @Dong > >>>>>>>> > >>>>>>>>> I am not sure metrics such as isBackPressured are already sent to > >>>>> JM. > >>>>>>>> > >>>>>>>> Fetching code path on the JM: > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture > >>>>>>>> > >>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add > >>>>>>>> > >>>>>>>> Example code path accessing Task level metrics via JM using the > >>>>>>>> `MetricStore`: > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler > >>>>>>>> > >>>>>>> > >>>>>>> Thanks for the code reference. I checked the code that invoked these > >>>>> two > >>>>>>> classes and found the following information: > >>>>>>> > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked > >>>> only > >>>>>>> when AggregatingJobsMetricsHandler is invoked. > >>>>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by > >>>>>>> WebMonitorEndpoint#initializeHandlers > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by > >>>>>> RestServerEndpoint. > >>>>>>> And RestServerEndpoint invokes these handlers in response to external > >>>>>> REST > >>>>>>> request. > >>>>>>> > >>>>>>> I understand that JM will get the backpressure-related metrics every > >>>>> time > >>>>>>> the RestServerEndpoint receives the REST request to get these > >>>> metrics. > >>>>>> But > >>>>>>> I am not sure if RestServerEndpoint is already always receiving the > >>>>> REST > >>>>>>> metrics at regular interval (suppose there is no human manually > >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the > >>>>> interval? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>>> For example, let's say every source operator subtask reports this > >>>>>>> metric > >>>>>>>> to > >>>>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each > >>>>>>> subtask > >>>>>>>>> is backpressured roughly 10% of the total time due to traffic > >>>>> spikes > >>>>>>> (and > >>>>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 = > >>>>>>> 99.997% > >>>>>>>>> chance that there is at least one subtask that is backpressured. > >>>>> Then > >>>>>>> we > >>>>>>>>> have to wait for at least 10 seconds to check again. > >>>>>>>> > >>>>>>>> backPressuredTimeMsPerSecond and other related metrics (like > >>>>>>>> busyTimeMsPerSecond) are not subject to that problem. > >>>>>>>> They are recalculated once every metric fetching interval, and they > >>>>>>> report > >>>>>>>> accurately on average the given subtask spent > >>>>>> busy/idling/backpressured. > >>>>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s. > >>>>>>> > >>>>>>> > >>>>>>> Suppose every subtask is already reporting > >>>> backPressuredTimeMsPerSecond > >>>>>> to > >>>>>>> JM once every 100 ms. If a job has 10 operators (that are not > >>>> chained) > >>>>>> and > >>>>>>> each operator has 100 subtasks, then JM would need to handle 10000 > >>>>>> requests > >>>>>>> per second to receive metrics from these 1000 subtasks. It seems > >>>> like a > >>>>>>> non-trivial overhead for medium-to-large sized jobs and can make JM > >>>> the > >>>>>>> performance bottleneck during job execution. > >>>>>>> > >>>>>>> I would be surprised if Flink is already paying this much overhead > >>>> just > >>>>>> for > >>>>>>> metrics monitoring. That is the main reason I still doubt it is true. > >>>>> Can > >>>>>>> you show where this 100 ms is currently configured? > >>>>>>> > >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke > >>>>> the > >>>>>>> REST API at 100 ms interval. Then that means we need to considerably > >>>>>>> increase the network/cpu overhead at JM, where the overhead will > >>>>> increase > >>>>>>> as the number of TM/slots increase, which may pose risk to the > >>>>>> scalability > >>>>>>> of the proposed design. I am not sure we should do this. What do you > >>>>>> think? > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>>> While it will be nice to support additional use-cases > >>>>>>>>> with one proposal, it is probably also reasonable to make > >>>>> incremental > >>>>>>>>> progress and support the low-hanging-fruit use-case first. The > >>>>> choice > >>>>>>>>> really depends on the complexity and the importance of supporting > >>>>> the > >>>>>>>> extra > >>>>>>>>> use-cases. > >>>>>>>> > >>>>>>>> That would be true, if that was a private implementation detail or > >>>> if > >>>>>> the > >>>>>>>> low-hanging-fruit-solution would be on the direct path to the final > >>>>>>>> solution. > >>>>>>>> That's unfortunately not the case here. This will add public facing > >>>>>> API, > >>>>>>>> that we will later need to maintain, no matter what the final > >>>>> solution > >>>>>>> will > >>>>>>>> be, > >>>>>>>> and at the moment at least I don't see it being related to a > >>>>> "perfect" > >>>>>>>> solution. > >>>>>>> > >>>>>>> > >>>>>>> Sure. Then let's decide the final solution first. > >>>>>>> > >>>>>>> > >>>>>>>>> I guess the point is that the suggested approach, which > >>>> dynamically > >>>>>>>>> determines the checkpointing interval based on the backpressure, > >>>>> may > >>>>>>>> cause > >>>>>>>>> regression when the checkpointing interval is relatively low. > >>>> This > >>>>>>> makes > >>>>>>>> it > >>>>>>>>> hard for users to enable this feature in production. It is like > >>>> an > >>>>>>>>> auto-driving system that is not guaranteed to work > >>>>>>>> > >>>>>>>> Yes, creating a more generic solution that would require less > >>>>>>> configuration > >>>>>>>> is usually more difficult then static configurations. > >>>>>>>> It doesn't mean we shouldn't try to do that. Especially that if my > >>>>>>> proposed > >>>>>>>> algorithm wouldn't work good enough, there is > >>>>>>>> an obvious solution, that any source could add a metric, like let > >>>> say > >>>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger` > >>>>>>>> could use this as an override to always switch to the > >>>>>>>> "slowCheckpointInterval". I don't think we need it, but that's > >>>> always > >>>>>> an > >>>>>>>> option > >>>>>>>> that would be basically equivalent to your original proposal. Or > >>>> even > >>>>>>>> source could add "suggestedCheckpointInterval : int", and > >>>>>>>> `CheckpointTrigger` could use that value if present as a hint in > >>>> one > >>>>>> way > >>>>>>> or > >>>>>>>> another. > >>>>>>>> > >>>>>>> > >>>>>>> So far we have talked about the possibility of using > >>>> CheckpointTrigger > >>>>>> and > >>>>>>> mentioned the CheckpointTrigger > >>>>>>> and read metric values. > >>>>>>> > >>>>>>> Can you help answer the following questions so that I can understand > >>>>> the > >>>>>>> alternative solution more concretely: > >>>>>>> > >>>>>>> - What is the interface of this CheckpointTrigger? For example, are > >>>> we > >>>>>>> going to give CheckpointTrigger a context that it can use to fetch > >>>>>>> arbitrary metric values? This can help us understand what information > >>>>>> this > >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint > >>>> decision. > >>>>>>> - Where is this CheckpointTrigger running? For example, is it going > >>>> to > >>>>>> run > >>>>>>> on the subtask of every source operator? Or is it going to run on the > >>>>> JM? > >>>>>>> - Are we going to provide a default implementation of this > >>>>>>> CheckpointTrigger in Flink that implements the algorithm described > >>>>> below, > >>>>>>> or do we expect each source operator developer to implement their own > >>>>>>> CheckpointTrigger? > >>>>>>> - How can users specify the > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > >>>>>>> For example, will we provide APIs on the CheckpointTrigger that > >>>>> end-users > >>>>>>> can use to specify the checkpointing interval? What would that look > >>>>> like? > >>>>>>> > >>>>>>> Overall, my gut feel is that the alternative approach based on > >>>>>>> CheckpointTrigger is more complicated and harder to use. And it > >>>>> probably > >>>>>>> also has the issues of "having two places to configure checkpointing > >>>>>>> interval" and "giving flexibility for every source to implement a > >>>>>> different > >>>>>>> API" (as mentioned below). > >>>>>>> > >>>>>>> Maybe we can evaluate it more after knowing the answers to the above > >>>>>>> questions. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>>> On the other hand, the approach currently proposed in the FLIP is > >>>>>> much > >>>>>>>>> simpler as it does not depend on backpressure. Users specify the > >>>>>> extra > >>>>>>>>> interval requirement on the specific sources (e.g. HybridSource, > >>>>>> MySQL > >>>>>>>> CDC > >>>>>>>>> Source) and can easily know the checkpointing interval will be > >>>> used > >>>>>> on > >>>>>>>> the > >>>>>>>>> continuous phase of the corresponding source. This is pretty much > >>>>>> same > >>>>>>> as > >>>>>>>>> how users use the existing execution.checkpointing.interval > >>>> config. > >>>>>> So > >>>>>>>>> there is no extra concern of regression caused by this approach. > >>>>>>>> > >>>>>>>> To an extent, but as I have already previously mentioned I really > >>>>>> really > >>>>>>> do > >>>>>>>> not like idea of: > >>>>>>>> - having two places to configure checkpointing interval (config > >>>>> file > >>>>>>> and > >>>>>>>> in the Source builders) > >>>>>>>> - giving flexibility for every source to implement a different > >>>> API > >>>>>> for > >>>>>>>> that purpose > >>>>>>>> - creating a solution that is not generic enough, so that we will > >>>>>> need > >>>>>>> a > >>>>>>>> completely different mechanism in the future anyway > >>>>>>>> > >>>>>>> > >>>>>>> Yeah, I understand different developers might have different > >>>>>>> concerns/tastes for these APIs. Ultimately, there might not be a > >>>>> perfect > >>>>>>> solution and we have to choose based on the pros/cons of these > >>>>> solutions. > >>>>>>> > >>>>>>> I agree with you that, all things being equal, it is preferable to 1) > >>>>>> have > >>>>>>> one place to configure checkpointing intervals, 2) have all source > >>>>>>> operators use the same API, and 3) create a solution that is generic > >>>>> and > >>>>>>> last lasting. Note that these three goals affects the usability and > >>>>>>> extensibility of the API, but not necessarily the > >>>> stability/performance > >>>>>> of > >>>>>>> the production job. > >>>>>>> > >>>>>>> BTW, there are also other preferrable goals. For example, it is very > >>>>>> useful > >>>>>>> for the job's behavior to be predictable and interpretable so that > >>>> SRE > >>>>>> can > >>>>>>> operator/debug the Flink in an easier way. We can list these > >>>> pros/cons > >>>>>>> altogether later. > >>>>>>> > >>>>>>> I am wondering if we can first agree on the priority of goals we want > >>>>> to > >>>>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be > >>>>>>> clearly defined and users should be able to use the API without > >>>> concern > >>>>>> of > >>>>>>> regression. And this requirement is more important than the other > >>>> goals > >>>>>>> discussed above because it is related to the stability/performance of > >>>>> the > >>>>>>> production job. What do you think? > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>>> Sounds good. Looking forward to learning more ideas. > >>>>>>>> > >>>>>>>> I have thought about this a bit more, and I think we don't need to > >>>>>> check > >>>>>>>> for the backpressure status, or how much overloaded all of the > >>>>>> operators > >>>>>>>> are. > >>>>>>>> We could just check three things for source operators: > >>>>>>>> 1. pendingRecords (backlog length) > >>>>>>>> 2. numRecordsInPerSecond > >>>>>>>> 3. backPressuredTimeMsPerSecond > >>>>>>>> > >>>>>>>> // int metricsUpdateInterval = 10s // obtained from config > >>>>>>>> // Next line calculates how many records can we consume from the > >>>>>> backlog, > >>>>>>>> assuming > >>>>>>>> // that magically the reason behind a backpressure vanishes. We > >>>> will > >>>>>> use > >>>>>>>> this only as > >>>>>>>> // a safeguard against scenarios like for example if backpressure > >>>>> was > >>>>>>>> caused by some > >>>>>>>> // intermittent failure/performance degradation. > >>>>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / > >>>>> (1000 > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval > >>>>>>>> > >>>>>>> > >>>>>>> I am not sure if there is a typo. Because if > >>>>>> backPressuredTimeMsPerSecond = > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > >>>> numRecordsInPerSecond / > >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm. > >>>>>>> > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > >>>>>> (numRecordsInPerSecond > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > >>>> metricsUpdateInterval"? > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the > >>>>>>> backlog > >>>>>>>> as > >>>>>>>> // a safeguard against an intermittent back pressure problems, so > >>>>> that > >>>>>> we > >>>>>>>> don't > >>>>>>>> // calculate next checkpoint interval far far in the future, while > >>>>> the > >>>>>>>> backpressure > >>>>>>>> // goes away before we will recalculate metrics and new > >>>> checkpointing > >>>>>>>> interval > >>>>>>>> timeToConsumeBacklog = (pendingRecords - > >>>>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond > >>>>>>>> > >>>>>>>> > >>>>>>>> Then we can use those numbers to calculate desired checkpointed > >>>>>> interval > >>>>>>>> for example like this: > >>>>>>>> > >>>>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10; > >>>> //this > >>>>>> may > >>>>>>>> need some refining > >>>>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval, > >>>>>>>> calculatedCheckpointInterval), slowCheckpointInterval); > >>>>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval; > >>>>>>>> > >>>>>>>> WDYT? > >>>>>>> > >>>>>>> > >>>>>>> I think the idea of the above algorithm is to incline to use the > >>>>>>> fastCheckpointInterval unless we are very sure the backlog will take > >>>> a > >>>>>> long > >>>>>>> time to process. This can alleviate the concern of regression during > >>>>> the > >>>>>>> continuous_bounded phase since we are more likely to use the > >>>>>>> fastCheckpointInterval. However, it can cause regression during the > >>>>>> bounded > >>>>>>> phase. > >>>>>>> > >>>>>>> I will use a concrete example to explain the risk of regression: > >>>>>>> - The user is using HybridSource to read from HDFS followed by Kafka. > >>>>> The > >>>>>>> data in HDFS is old and there is no need for data freshness for the > >>>>> data > >>>>>> in > >>>>>>> HDFS. > >>>>>>> - The user configures the job as below: > >>>>>>> - fastCheckpointInterval = 3 minutes > >>>>>>> - slowCheckpointInterval = 30 minutes > >>>>>>> - metricsUpdateInterval = 100 ms > >>>>>>> > >>>>>>> Using the above formulate, we can know that once pendingRecords > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then > >>>>> calculatedCheckpointInterval > >>>>>> <= > >>>>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the > >>>>>>> checkpointing interval. Then in the last 30 minutes of the bounded > >>>>> phase, > >>>>>>> the checkpointing frequency will be 10X higher than what the user > >>>>> wants. > >>>>>>> > >>>>>>> Also note that the same issue would also considerably limit the > >>>>> benefits > >>>>>> of > >>>>>>> the algorithm. For example, during the continuous phase, the > >>>> algorithm > >>>>>> will > >>>>>>> only be better than the approach in FLIP-309 when there is at least > >>>>>>> 30-minutes worth of backlog in the source. > >>>>>>> > >>>>>>> Sure, having a slower checkpointing interval in this extreme case > >>>>> (where > >>>>>>> there is 30-minutes backlog in the continous-unbounded phase) is > >>>> still > >>>>>>> useful when this happens. But since this is the un-common case, and > >>>> the > >>>>>>> right solution is probably to do capacity planning to avoid this from > >>>>>>> happening in the first place, I am not sure it is worth optimizing > >>>> for > >>>>>> this > >>>>>>> case at the cost of regression in the bounded phase and the reduced > >>>>>>> operational predictability for users (e.g. what checkpointing > >>>> interval > >>>>>>> should I expect at this stage of the job). > >>>>>>> > >>>>>>> I think the fundamental issue with this algorithm is that it is > >>>> applied > >>>>>> to > >>>>>>> both the bounded phases and the continous_unbounded phases without > >>>>>> knowing > >>>>>>> which phase the job is running at. The only information it can access > >>>>> is > >>>>>>> the backlog. But two sources with the same amount of backlog do not > >>>>>>> necessarily mean they have the same data freshness requirement. > >>>>>>> > >>>>>>> In this particular example, users know that the data in HDFS is very > >>>>> old > >>>>>>> and there is no need for data freshness. Users can express signals > >>>> via > >>>>>> the > >>>>>>> per-source API proposed in the FLIP. This is why the current approach > >>>>> in > >>>>>>> FLIP-309 can be better in this case. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Best, > >>>>>>> Dong > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > >