Thank you Dong for driving this FLIP. The new design looks good to me!
Best, Jark > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道: > > Thank you Leonard for the review! > > Hi Piotr, do you have any comments on the latest proposal? > > I am wondering if it is OK to start the voting thread this week. > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote: > >> Thanks Dong for driving this FLIP forward! >> >> Introducing `backlog status` concept for flink job makes sense to me as >> following reasons: >> >> From concept/API design perspective, it’s more general and natural than >> above proposals as it can be used in HybridSource for bounded records, CDC >> Source for history snapshot and general sources like KafkaSource for >> historical messages. >> >> From user cases/requirements, I’ve seen many users manually to set larger >> checkpoint interval during backfilling and then set a shorter checkpoint >> interval for real-time processing in their production environments as a >> flink application optimization. Now, the flink framework can make this >> optimization no longer require the user to set the checkpoint interval and >> restart the job multiple times. >> >> Following supporting using larger checkpoint for job under backlog status >> in current FLIP, we can explore supporting larger parallelism/memory/cpu >> for job under backlog status in the future. >> >> In short, the updated FLIP looks good to me. >> >> >> Best, >> Leonard >> >> >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote: >>> >>> Hi Piotr, >>> >>> Thanks again for proposing the isProcessingBacklog concept. >>> >>> After discussing with Becket Qin and thinking about this more, I agree it >>> is a better idea to add a top-level concept to all source operators to >>> address the target use-case. >>> >>> The main reason that changed my mind is that isProcessingBacklog can be >>> described as an inherent/nature attribute of every source instance and >> its >>> semantics does not need to depend on any specific checkpointing policy. >>> Also, we can hardcode the isProcessingBacklog behavior for the sources we >>> have considered so far (e.g. HybridSource and MySQL CDC source) without >>> asking users to explicitly configure the per-source behavior, which >> indeed >>> provides better user experience. >>> >>> I have updated the FLIP based on the latest suggestions. The latest FLIP >> no >>> longer introduces per-source config that can be used by end-users. While >> I >>> agree with you that CheckpointTrigger can be a useful feature to address >>> additional use-cases, I am not sure it is necessary for the use-case >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately >>> in another FLIP? >>> >>> Can you help take another look at the updated FLIP? >>> >>> Best, >>> Dong >>> >>> >>> >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi Dong, >>>> >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being >>>>> "backpressured" at a given time (due to random traffic spikes). Then at >>>> any >>>>> given time, the chance of the job >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate >> the >>>>> backpressure metric once a second, the estimated time for the job >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = >> 23163 >>>>> sec = 6.4 hours. >>>>> >>>>> This means that the job will effectively always use the longer >>>>> checkpointing interval. It looks like a real concern, right? >>>> >>>> Sorry I don't understand where you are getting those numbers from. >>>> Instead of trying to find loophole after loophole, could you try to >> think >>>> how a given loophole could be improved/solved? >>>> >>>>> Hmm... I honestly think it will be useful to know the APIs due to the >>>>> following reasons. >>>> >>>> Please propose something. I don't think it's needed. >>>> >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the >>>> APIs >>>>> of this alternative approach be more or less usable? >>>> >>>> Everything that you originally wanted to achieve in FLIP-309, you could >> do >>>> as well in my proposal. >>>> Vide my many mentions of the "hacky solution". >>>> >>>>> - Can these APIs reliably address the extra use-case (e.g. allow >>>>> checkpointing interval to change dynamically even during the unbounded >>>>> phase) as it claims? >>>> >>>> I don't see why not. >>>> >>>>> - Can these APIs be decoupled from the APIs currently proposed in >>>> FLIP-309? >>>> >>>> Yes >>>> >>>>> For example, if the APIs of this alternative approach can be decoupled >>>> from >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to >>>>> work on this extra use-case with a more advanced/complicated design >>>>> separately in a followup work. >>>> >>>> As I voiced my concerns previously, the current design of FLIP-309 would >>>> clog the public API and in the long run confuse the users. IMO It's >>>> addressing the >>>> problem in the wrong place. >>>> >>>>> Hmm.. do you mean we can do the following: >>>>> - Have all source operators emit a metric named "processingBacklog". >>>>> - Add a job-level config that specifies "the checkpointing interval to >> be >>>>> used when any source is processing backlog". >>>>> - The JM collects the "processingBacklog" periodically from all source >>>>> operators and uses the newly added config value as appropriate. >>>> >>>> Yes. >>>> >>>>> The challenge with this approach is that we need to define the >> semantics >>>> of >>>>> this "processingBacklog" metric and have all source operators >>>>> implement this metric. I am not sure we are able to do this yet without >>>>> having users explicitly provide this information on a per-source basis. >>>>> >>>>> Suppose the job read from a bounded Kafka source, should it emit >>>>> "processingBacklog=true"? If yes, then the job might use long >>>> checkpointing >>>>> interval even >>>>> if the job is asked to process data starting from now to the next 1 >> hour. >>>>> If no, then the job might use the short checkpointing interval >>>>> even if the job is asked to re-process data starting from 7 days ago. >>>> >>>> Yes. The same can be said of your proposal. Your proposal has the very >> same >>>> issues >>>> that every source would have to implement it differently, most sources >>>> would >>>> have no idea how to properly calculate the new requested checkpoint >>>> interval, >>>> for those that do know how to do that, user would have to configure >> every >>>> source >>>> individually and yet again we would end up with a system, that works >> only >>>> partially in >>>> some special use cases (HybridSource), that's confusing the users even >>>> more. >>>> >>>> That's why I think the more generic solution, working primarily on the >> same >>>> metrics that are used by various auto scaling solutions (like Flink K8s >>>> operator's >>>> autosaler) would be better. The hacky solution I proposed to: >>>> 1. show you that the generic solution is simply a superset of your >> proposal >>>> 2. if you are adamant that busyness/backpressured/records processing >>>> rate/pending records >>>> metrics wouldn't cover your use case sufficiently (imo they can), >> then >>>> you can very easily >>>> enhance this algorithm with using some hints from the sources. Like >>>> "processingBacklog==true" >>>> to short circuit the main algorithm, if `processingBacklog` is >>>> available. >>>> >>>> Best, >>>> Piotrek >>>> >>>> >>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a): >>>> >>>>> Hi again Piotr, >>>>> >>>>> Thank you for the reply. Please see my reply inline. >>>>> >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski < >>>> piotr.nowoj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi again Dong, >>>>>> >>>>>>> I understand that JM will get the backpressure-related metrics every >>>>> time >>>>>>> the RestServerEndpoint receives the REST request to get these >>>> metrics. >>>>>> But >>>>>>> I am not sure if RestServerEndpoint is already always receiving the >>>>> REST >>>>>>> metrics at regular interval (suppose there is no human manually >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the >>>>> interval? >>>>>> >>>>>> Good catch, I've thought that metrics are pre-emptively sent to JM >>>> every >>>>> 10 >>>>>> seconds. >>>>>> Indeed that's not the case at the moment, and that would have to be >>>>>> improved. >>>>>> >>>>>>> I would be surprised if Flink is already paying this much overhead >>>> just >>>>>> for >>>>>>> metrics monitoring. That is the main reason I still doubt it is true. >>>>> Can >>>>>>> you show where this 100 ms is currently configured? >>>>>>> >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke >>>>> the >>>>>>> REST API at 100 ms interval. Then that means we need to considerably >>>>>>> increase the network/cpu overhead at JM, where the overhead will >>>>> increase >>>>>>> as the number of TM/slots increase, which may pose risk to the >>>>>> scalability >>>>>>> of the proposed design. I am not sure we should do this. What do you >>>>>> think? >>>>>> >>>>>> Sorry. I didn't mean metric should be reported every 100ms. I meant >>>> that >>>>>> "backPressuredTimeMsPerSecond (metric) would report (a value of) >>>>> 100ms/s." >>>>>> once per metric interval (10s?). >>>>>> >>>>> >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of being >>>>> "backpressured" at a given time (due to random traffic spikes). Then at >>>> any >>>>> given time, the chance of the job >>>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate >> the >>>>> backpressure metric once a second, the estimated time for the job >>>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = >> 23163 >>>>> sec = 6.4 hours. >>>>> >>>>> This means that the job will effectively always use the longer >>>>> checkpointing interval. It looks like a real concern, right? >>>>> >>>>> >>>>>>> - What is the interface of this CheckpointTrigger? For example, are >>>> we >>>>>>> going to give CheckpointTrigger a context that it can use to fetch >>>>>>> arbitrary metric values? This can help us understand what information >>>>>> this >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint >>>> decision. >>>>>> >>>>>> I honestly don't think this is important at this stage of the >>>> discussion. >>>>>> It could have >>>>>> whatever interface we would deem to be best. Required things: >>>>>> >>>>>> - access to at least a subset of metrics that the given >>>>> `CheckpointTrigger` >>>>>> requests, >>>>>> for example via some registration mechanism, so we don't have to >>>> fetch >>>>>> all of the >>>>>> metrics all the time from TMs. >>>>>> - some way to influence `CheckpointCoordinator`. Either via manually >>>>>> triggering >>>>>> checkpoints, and/or ability to change the checkpointing interval. >>>>>> >>>>> >>>>> Hmm... I honestly think it will be useful to know the APIs due to the >>>>> following reasons. >>>>> >>>>> We would need to know the concrete APIs to gauge the following: >>>>> - For the use-case mentioned in FLIP-309 motivation section, would the >>>> APIs >>>>> of this alternative approach be more or less usable? >>>>> - Can these APIs reliably address the extra use-case (e.g. allow >>>>> checkpointing interval to change dynamically even during the unbounded >>>>> phase) as it claims? >>>>> - Can these APIs be decoupled from the APIs currently proposed in >>>> FLIP-309? >>>>> >>>>> For example, if the APIs of this alternative approach can be decoupled >>>> from >>>>> the APIs currently proposed in FLIP-309, then it might be reasonable to >>>>> work on this extra use-case with a more advanced/complicated design >>>>> separately in a followup work. >>>>> >>>>> >>>>>>> - Where is this CheckpointTrigger running? For example, is it going >>>> to >>>>>> run >>>>>>> on the subtask of every source operator? Or is it going to run on the >>>>> JM? >>>>>> >>>>>> IMO on the JM. >>>>>> >>>>>>> - Are we going to provide a default implementation of this >>>>>>> CheckpointTrigger in Flink that implements the algorithm described >>>>> below, >>>>>>> or do we expect each source operator developer to implement their own >>>>>>> CheckpointTrigger? >>>>>> >>>>>> As I mentioned before, I think we should provide at the very least the >>>>>> implementation >>>>>> that replaces the current triggering mechanism (statically configured >>>>>> checkpointing interval) >>>>>> and it would be great to provide the backpressure monitoring trigger >> as >>>>>> well. >>>>>> >>>>> >>>>> I agree that if there is a good use-case that can be addressed by the >>>>> proposed CheckpointTrigger, then it is reasonable >>>>> to add CheckpointTrigger and replace the current triggering mechanism >>>> with >>>>> it. >>>>> >>>>> I also agree that we will likely find such a use-case. For example, >>>> suppose >>>>> the source records have event timestamps, then it is likely >>>>> that we can use the trigger to dynamically control the checkpointing >>>>> interval based on the difference between the watermark and current >> system >>>>> time. >>>>> >>>>> But I am not sure the addition of this CheckpointTrigger should be >>>> coupled >>>>> with FLIP-309. Whether or not it is coupled probably depends on the >>>>> concrete API design around CheckpointTrigger. >>>>> >>>>> If you would be adamant that the backpressure monitoring doesn't cover >>>> well >>>>>> enough your use case, I would be ok to provide the hacky version that >> I >>>>>> also mentioned >>>>>> before: >>>>> >>>>> >>>>>> """ >>>>>> Especially that if my proposed algorithm wouldn't work good enough, >>>> there >>>>>> is >>>>>> an obvious solution, that any source could add a metric, like let say >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger` >>>>>> could use this as an override to always switch to the >>>>>> "slowCheckpointInterval". I don't think we need it, but that's always >>>> an >>>>>> option >>>>>> that would be basically equivalent to your original proposal. >>>>>> """ >>>>>> >>>>> >>>>> Hmm.. do you mean we can do the following: >>>>> - Have all source operators emit a metric named "processingBacklog". >>>>> - Add a job-level config that specifies "the checkpointing interval to >> be >>>>> used when any source is processing backlog". >>>>> - The JM collects the "processingBacklog" periodically from all source >>>>> operators and uses the newly added config value as appropriate. >>>>> >>>>> The challenge with this approach is that we need to define the >> semantics >>>> of >>>>> this "processingBacklog" metric and have all source operators >>>>> implement this metric. I am not sure we are able to do this yet without >>>>> having users explicitly provide this information on a per-source basis. >>>>> >>>>> Suppose the job read from a bounded Kafka source, should it emit >>>>> "processingBacklog=true"? If yes, then the job might use long >>>> checkpointing >>>>> interval even >>>>> if the job is asked to process data starting from now to the next 1 >> hour. >>>>> If no, then the job might use the short checkpointing interval >>>>> even if the job is asked to re-process data starting from 7 days ago. >>>>> >>>>> >>>>>> >>>>>>> - How can users specify the >>>>>> fastCheckpointInterval/slowCheckpointInterval? >>>>>>> For example, will we provide APIs on the CheckpointTrigger that >>>>> end-users >>>>>>> can use to specify the checkpointing interval? What would that look >>>>> like? >>>>>> >>>>>> Also as I mentioned before, just like metric reporters are configured: >>>>>> >>>>>> >>>>> >>>> >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ >>>>>> Every CheckpointTrigger could have its own custom configuration. >>>>>> >>>>>>> Overall, my gut feel is that the alternative approach based on >>>>>>> CheckpointTrigger is more complicated >>>>>> >>>>>> Yes, as usual, more generic things are more complicated, but often >> more >>>>>> useful in the long run. >>>>>> >>>>>>> and harder to use. >>>>>> >>>>>> I don't agree. Why setting in config >>>>>> >>>>>> execution.checkpointing.trigger: >>>> BackPressureMonitoringCheckpointTrigger >>>>>> >>>>>> >>>>> >>>> >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: >>>>>> 1s >>>>>> >>>>>> >>>>> >>>> >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: >>>>>> 30s >>>>>> >>>>>> that we could even provide a shortcut to the above construct via: >>>>>> >>>>>> execution.checkpointing.fast-interval: 1s >>>>>> execution.checkpointing.slow-interval: 30s >>>>>> >>>>>> is harder compared to setting two/three checkpoint intervals, one in >>>> the >>>>>> config/or via `env.enableCheckpointing(x)`, >>>>>> secondly passing one/two (fast/slow) values on the source itself? >>>>>> >>>>> >>>>> If we can address the use-case by providing just the two job-level >> config >>>>> as described above, I agree it will indeed be simpler. >>>>> >>>>> I have tried to achieve this goal. But the caveat is that it requires >>>> much >>>>> more work than described above in order to give the configs >> well-defined >>>>> semantics. So I find it simpler to just use the approach in FLIP-309. >>>>> >>>>> Let me explain my concern below. It will be great if you or someone >> else >>>>> can help provide a solution. >>>>> >>>>> 1) We need to clearly document when the fast-interval and slow-interval >>>>> will be used so that users can derive the expected behavior of the job >>>> and >>>>> be able to config these values. >>>>> >>>>> 2) The trigger of fast/slow interval depends on the behavior of the >>>> source >>>>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source >>>>> operator (e.g. boundedness) can describe the target behavior. For >>>> example, >>>>> MySQL CDC internally has two phases, namely snapshot phase and binlog >>>>> phase, which are not explicitly exposed to its users via source >> operator >>>>> API. And we probably should not enumerate all internal phases of all >>>> source >>>>> operators that are affected by fast/slow interval. >>>>> >>>>> 3) An alternative approach might be to define a new concept (e.g. >>>>> processingBacklog) that is applied to all source operators. Then the >>>>> fast/slow interval's documentation can depend on this concept. That >> means >>>>> we have to add a top-level concept (similar to source boundedness) and >>>>> require all source operators to specify how they enforce this concept >>>> (e.g. >>>>> FileSystemSource always emits processingBacklog=true). And there might >> be >>>>> cases where the source itself (e.g. a bounded Kafka Source) can not >>>>> automatically derive the value of this concept, in which case we need >> to >>>>> provide option for users to explicitly specify the value for this >> concept >>>>> on a per-source basis. >>>>> >>>>> >>>>> >>>>>>> And it probably also has the issues of "having two places to >>>> configure >>>>>> checkpointing >>>>>>> interval" and "giving flexibility for every source to implement a >>>>>> different >>>>>>> API" (as mentioned below). >>>>>> >>>>>> No, it doesn't. >>>>>> >>>>>>> IMO, it is a hard-requirement for the user-facing API to be >>>>>>> clearly defined and users should be able to use the API without >>>> concern >>>>>> of >>>>>>> regression. And this requirement is more important than the other >>>> goals >>>>>>> discussed above because it is related to the stability/performance of >>>>> the >>>>>>> production job. What do you think? >>>>>> >>>>>> I don't agree with this. There are many things that work something in >>>>>> between perfectly and well enough >>>>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%), >>>>> while >>>>>> still being very useful. >>>>>> Good examples are: selection of state backend, unaligned checkpoints, >>>>>> buffer debloating but frankly if I go >>>>>> through list of currently available config options, something like >> half >>>>> of >>>>>> them can cause regressions. Heck, >>>>>> even Flink itself doesn't work perfectly in 100% of the use cases, due >>>>> to a >>>>>> variety of design choices. Of >>>>>> course, the more use cases are fine with said feature, the better, but >>>> we >>>>>> shouldn't fixate to perfectly cover >>>>>> 100% of the cases, as that's impossible. >>>>>> >>>>>> In this particular case, if back pressure monitoring trigger can work >>>>> well >>>>>> enough in 95% of cases, I would >>>>>> say that's already better than the originally proposed alternative, >>>> which >>>>>> doesn't work at all if user has a large >>>>>> backlog to reprocess from Kafka, including when using HybridSource >>>> AFTER >>>>>> the switch to Kafka has >>>>>> happened. For the remaining 5%, we should try to improve the behaviour >>>>> over >>>>>> time, but ultimately, users can >>>>>> decide to just run a fixed checkpoint interval (or at worst use the >>>> hacky >>>>>> checkpoint trigger that I mentioned >>>>>> before a couple of times). >>>>>> >>>>>> Also to be pedantic, if a user naively selects slow-interval in your >>>>>> proposal to 30 minutes, when that user's >>>>>> job fails on average every 15-20minutes, his job can end up in a state >>>>> that >>>>>> it can not make any progress, >>>>>> this arguably is quite serious regression. >>>>>> >>>>> >>>>> I probably should not say it is "hard requirement". After all there are >>>>> pros/cons. We will need to consider implementation complexity, >> usability, >>>>> extensibility etc. >>>>> >>>>> I just don't think we should take it for granted to introduce >> regression >>>>> for one use-case in order to support another use-case. If we can not >> find >>>>> an algorithm/solution that addresses >>>>> both use-case well, I hope we can be open to tackle them separately so >>>> that >>>>> users can choose the option that best fits their needs. >>>>> >>>>> All things else being equal, I think it is preferred for user-facing >> API >>>> to >>>>> be clearly defined and let users should be able to use the API without >>>>> concern of regression. >>>>> >>>>> Maybe we can list pros/cons for the alternative approaches we have been >>>>> discussing and see choose the best approach. And maybe we will end up >>>>> finding that use-case >>>>> which needs CheckpointTrigger can be tackled separately from the >> use-case >>>>> in FLIP-309. >>>>> >>>>> >>>>>>> I am not sure if there is a typo. Because if >>>>> backPressuredTimeMsPerSecond >>>>>> = >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = >>>> numRecordsInPerSecond / >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm. >>>>>>> >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = >>>>>> (numRecordsInPerSecond >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * >>>> metricsUpdateInterval"? >>>>>> >>>>>> It looks like there is indeed some mistake in my proposal above. Yours >>>>> look >>>>>> more correct, it probably >>>>>> still needs some safeguard/special handling if >>>>>> `backPressuredTimeMsPerSecond > 950` >>>>>> >>>>>>> The only information it can access is the backlog. >>>>>> >>>>>> Again no. It can access whatever we want to provide to it. >>>>>> >>>>>> Regarding the rest of your concerns. It's a matter of tweaking the >>>>>> parameters and the algorithm itself, >>>>>> and how much safety-net do we want to have. Ultimately, I'm pretty >> sure >>>>>> that's a (for 95-99% of cases) >>>>>> solvable problem. If not, there is always the hacky solution, that >>>> could >>>>> be >>>>>> even integrated into this above >>>>>> mentioned algorithm as a short circuit to always reach >> `slow-interval`. >>>>>> >>>>>> Apart of that, you picked 3 minutes as the checkpointing interval in >>>> your >>>>>> counter example. In most cases >>>>>> any interval above 1 minute would inflict pretty negligible overheads, >>>> so >>>>>> all in all, I would doubt there is >>>>>> a significant benefit (in most cases) of increasing 3 minute >> checkpoint >>>>>> interval to anything more, let alone >>>>>> 30 minutes. >>>>>> >>>>> >>>>> I am not sure we should design the algorithm with the assumption that >> the >>>>> short checkpointing interval will always be higher than 1 minute etc. >>>>> >>>>> I agree the proposed algorithm can solve most cases where the resource >> is >>>>> sufficient and there is always no backlog in source subtasks. On the >>>> other >>>>> hand, what makes SRE >>>>> life hard is probably the remaining 1-5% cases where the traffic is >> spiky >>>>> and the cluster is reaching its capacity limit. The ability to predict >>>> and >>>>> control Flink job's behavior (including checkpointing interval) can >>>>> considerably reduce the burden of manging Flink jobs. >>>>> >>>>> Best, >>>>> Dong >>>>> >>>>> >>>>>> >>>>>> Best, >>>>>> Piotrek >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a): >>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> Thanks for the explanations. I have some followup questions below. >>>>>>> >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> Thanks for chipping in the discussion Ahmed! >>>>>>>> >>>>>>>> Regarding using the REST API. Currently I'm leaning towards >>>>>> implementing >>>>>>>> this feature inside the Flink itself, via some pluggable interface. >>>>>>>> REST API solution would be tempting, but I guess not everyone is >>>>> using >>>>>>>> Flink Kubernetes Operator. >>>>>>>> >>>>>>>> @Dong >>>>>>>> >>>>>>>>> I am not sure metrics such as isBackPressured are already sent to >>>>> JM. >>>>>>>> >>>>>>>> Fetching code path on the JM: >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture >>>>>>>> >>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add >>>>>>>> >>>>>>>> Example code path accessing Task level metrics via JM using the >>>>>>>> `MetricStore`: >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler >>>>>>>> >>>>>>> >>>>>>> Thanks for the code reference. I checked the code that invoked these >>>>> two >>>>>>> classes and found the following information: >>>>>>> >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked >>>> only >>>>>>> when AggregatingJobsMetricsHandler is invoked. >>>>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by >>>>>>> WebMonitorEndpoint#initializeHandlers >>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by >>>>>> RestServerEndpoint. >>>>>>> And RestServerEndpoint invokes these handlers in response to external >>>>>> REST >>>>>>> request. >>>>>>> >>>>>>> I understand that JM will get the backpressure-related metrics every >>>>> time >>>>>>> the RestServerEndpoint receives the REST request to get these >>>> metrics. >>>>>> But >>>>>>> I am not sure if RestServerEndpoint is already always receiving the >>>>> REST >>>>>>> metrics at regular interval (suppose there is no human manually >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the >>>>> interval? >>>>>>> >>>>>>> >>>>>>> >>>>>>>>> For example, let's say every source operator subtask reports this >>>>>>> metric >>>>>>>> to >>>>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each >>>>>>> subtask >>>>>>>>> is backpressured roughly 10% of the total time due to traffic >>>>> spikes >>>>>>> (and >>>>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 = >>>>>>> 99.997% >>>>>>>>> chance that there is at least one subtask that is backpressured. >>>>> Then >>>>>>> we >>>>>>>>> have to wait for at least 10 seconds to check again. >>>>>>>> >>>>>>>> backPressuredTimeMsPerSecond and other related metrics (like >>>>>>>> busyTimeMsPerSecond) are not subject to that problem. >>>>>>>> They are recalculated once every metric fetching interval, and they >>>>>>> report >>>>>>>> accurately on average the given subtask spent >>>>>> busy/idling/backpressured. >>>>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s. >>>>>>> >>>>>>> >>>>>>> Suppose every subtask is already reporting >>>> backPressuredTimeMsPerSecond >>>>>> to >>>>>>> JM once every 100 ms. If a job has 10 operators (that are not >>>> chained) >>>>>> and >>>>>>> each operator has 100 subtasks, then JM would need to handle 10000 >>>>>> requests >>>>>>> per second to receive metrics from these 1000 subtasks. It seems >>>> like a >>>>>>> non-trivial overhead for medium-to-large sized jobs and can make JM >>>> the >>>>>>> performance bottleneck during job execution. >>>>>>> >>>>>>> I would be surprised if Flink is already paying this much overhead >>>> just >>>>>> for >>>>>>> metrics monitoring. That is the main reason I still doubt it is true. >>>>> Can >>>>>>> you show where this 100 ms is currently configured? >>>>>>> >>>>>>> Alternatively, maybe you mean that we should add extra code to invoke >>>>> the >>>>>>> REST API at 100 ms interval. Then that means we need to considerably >>>>>>> increase the network/cpu overhead at JM, where the overhead will >>>>> increase >>>>>>> as the number of TM/slots increase, which may pose risk to the >>>>>> scalability >>>>>>> of the proposed design. I am not sure we should do this. What do you >>>>>> think? >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> While it will be nice to support additional use-cases >>>>>>>>> with one proposal, it is probably also reasonable to make >>>>> incremental >>>>>>>>> progress and support the low-hanging-fruit use-case first. The >>>>> choice >>>>>>>>> really depends on the complexity and the importance of supporting >>>>> the >>>>>>>> extra >>>>>>>>> use-cases. >>>>>>>> >>>>>>>> That would be true, if that was a private implementation detail or >>>> if >>>>>> the >>>>>>>> low-hanging-fruit-solution would be on the direct path to the final >>>>>>>> solution. >>>>>>>> That's unfortunately not the case here. This will add public facing >>>>>> API, >>>>>>>> that we will later need to maintain, no matter what the final >>>>> solution >>>>>>> will >>>>>>>> be, >>>>>>>> and at the moment at least I don't see it being related to a >>>>> "perfect" >>>>>>>> solution. >>>>>>> >>>>>>> >>>>>>> Sure. Then let's decide the final solution first. >>>>>>> >>>>>>> >>>>>>>>> I guess the point is that the suggested approach, which >>>> dynamically >>>>>>>>> determines the checkpointing interval based on the backpressure, >>>>> may >>>>>>>> cause >>>>>>>>> regression when the checkpointing interval is relatively low. >>>> This >>>>>>> makes >>>>>>>> it >>>>>>>>> hard for users to enable this feature in production. It is like >>>> an >>>>>>>>> auto-driving system that is not guaranteed to work >>>>>>>> >>>>>>>> Yes, creating a more generic solution that would require less >>>>>>> configuration >>>>>>>> is usually more difficult then static configurations. >>>>>>>> It doesn't mean we shouldn't try to do that. Especially that if my >>>>>>> proposed >>>>>>>> algorithm wouldn't work good enough, there is >>>>>>>> an obvious solution, that any source could add a metric, like let >>>> say >>>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger` >>>>>>>> could use this as an override to always switch to the >>>>>>>> "slowCheckpointInterval". I don't think we need it, but that's >>>> always >>>>>> an >>>>>>>> option >>>>>>>> that would be basically equivalent to your original proposal. Or >>>> even >>>>>>>> source could add "suggestedCheckpointInterval : int", and >>>>>>>> `CheckpointTrigger` could use that value if present as a hint in >>>> one >>>>>> way >>>>>>> or >>>>>>>> another. >>>>>>>> >>>>>>> >>>>>>> So far we have talked about the possibility of using >>>> CheckpointTrigger >>>>>> and >>>>>>> mentioned the CheckpointTrigger >>>>>>> and read metric values. >>>>>>> >>>>>>> Can you help answer the following questions so that I can understand >>>>> the >>>>>>> alternative solution more concretely: >>>>>>> >>>>>>> - What is the interface of this CheckpointTrigger? For example, are >>>> we >>>>>>> going to give CheckpointTrigger a context that it can use to fetch >>>>>>> arbitrary metric values? This can help us understand what information >>>>>> this >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint >>>> decision. >>>>>>> - Where is this CheckpointTrigger running? For example, is it going >>>> to >>>>>> run >>>>>>> on the subtask of every source operator? Or is it going to run on the >>>>> JM? >>>>>>> - Are we going to provide a default implementation of this >>>>>>> CheckpointTrigger in Flink that implements the algorithm described >>>>> below, >>>>>>> or do we expect each source operator developer to implement their own >>>>>>> CheckpointTrigger? >>>>>>> - How can users specify the >>>>>> fastCheckpointInterval/slowCheckpointInterval? >>>>>>> For example, will we provide APIs on the CheckpointTrigger that >>>>> end-users >>>>>>> can use to specify the checkpointing interval? What would that look >>>>> like? >>>>>>> >>>>>>> Overall, my gut feel is that the alternative approach based on >>>>>>> CheckpointTrigger is more complicated and harder to use. And it >>>>> probably >>>>>>> also has the issues of "having two places to configure checkpointing >>>>>>> interval" and "giving flexibility for every source to implement a >>>>>> different >>>>>>> API" (as mentioned below). >>>>>>> >>>>>>> Maybe we can evaluate it more after knowing the answers to the above >>>>>>> questions. >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> On the other hand, the approach currently proposed in the FLIP is >>>>>> much >>>>>>>>> simpler as it does not depend on backpressure. Users specify the >>>>>> extra >>>>>>>>> interval requirement on the specific sources (e.g. HybridSource, >>>>>> MySQL >>>>>>>> CDC >>>>>>>>> Source) and can easily know the checkpointing interval will be >>>> used >>>>>> on >>>>>>>> the >>>>>>>>> continuous phase of the corresponding source. This is pretty much >>>>>> same >>>>>>> as >>>>>>>>> how users use the existing execution.checkpointing.interval >>>> config. >>>>>> So >>>>>>>>> there is no extra concern of regression caused by this approach. >>>>>>>> >>>>>>>> To an extent, but as I have already previously mentioned I really >>>>>> really >>>>>>> do >>>>>>>> not like idea of: >>>>>>>> - having two places to configure checkpointing interval (config >>>>> file >>>>>>> and >>>>>>>> in the Source builders) >>>>>>>> - giving flexibility for every source to implement a different >>>> API >>>>>> for >>>>>>>> that purpose >>>>>>>> - creating a solution that is not generic enough, so that we will >>>>>> need >>>>>>> a >>>>>>>> completely different mechanism in the future anyway >>>>>>>> >>>>>>> >>>>>>> Yeah, I understand different developers might have different >>>>>>> concerns/tastes for these APIs. Ultimately, there might not be a >>>>> perfect >>>>>>> solution and we have to choose based on the pros/cons of these >>>>> solutions. >>>>>>> >>>>>>> I agree with you that, all things being equal, it is preferable to 1) >>>>>> have >>>>>>> one place to configure checkpointing intervals, 2) have all source >>>>>>> operators use the same API, and 3) create a solution that is generic >>>>> and >>>>>>> last lasting. Note that these three goals affects the usability and >>>>>>> extensibility of the API, but not necessarily the >>>> stability/performance >>>>>> of >>>>>>> the production job. >>>>>>> >>>>>>> BTW, there are also other preferrable goals. For example, it is very >>>>>> useful >>>>>>> for the job's behavior to be predictable and interpretable so that >>>> SRE >>>>>> can >>>>>>> operator/debug the Flink in an easier way. We can list these >>>> pros/cons >>>>>>> altogether later. >>>>>>> >>>>>>> I am wondering if we can first agree on the priority of goals we want >>>>> to >>>>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be >>>>>>> clearly defined and users should be able to use the API without >>>> concern >>>>>> of >>>>>>> regression. And this requirement is more important than the other >>>> goals >>>>>>> discussed above because it is related to the stability/performance of >>>>> the >>>>>>> production job. What do you think? >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> Sounds good. Looking forward to learning more ideas. >>>>>>>> >>>>>>>> I have thought about this a bit more, and I think we don't need to >>>>>> check >>>>>>>> for the backpressure status, or how much overloaded all of the >>>>>> operators >>>>>>>> are. >>>>>>>> We could just check three things for source operators: >>>>>>>> 1. pendingRecords (backlog length) >>>>>>>> 2. numRecordsInPerSecond >>>>>>>> 3. backPressuredTimeMsPerSecond >>>>>>>> >>>>>>>> // int metricsUpdateInterval = 10s // obtained from config >>>>>>>> // Next line calculates how many records can we consume from the >>>>>> backlog, >>>>>>>> assuming >>>>>>>> // that magically the reason behind a backpressure vanishes. We >>>> will >>>>>> use >>>>>>>> this only as >>>>>>>> // a safeguard against scenarios like for example if backpressure >>>>> was >>>>>>>> caused by some >>>>>>>> // intermittent failure/performance degradation. >>>>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / >>>>> (1000 >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval >>>>>>>> >>>>>>> >>>>>>> I am not sure if there is a typo. Because if >>>>>> backPressuredTimeMsPerSecond = >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = >>>> numRecordsInPerSecond / >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm. >>>>>>> >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = >>>>>> (numRecordsInPerSecond >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * >>>> metricsUpdateInterval"? >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the >>>>>>> backlog >>>>>>>> as >>>>>>>> // a safeguard against an intermittent back pressure problems, so >>>>> that >>>>>> we >>>>>>>> don't >>>>>>>> // calculate next checkpoint interval far far in the future, while >>>>> the >>>>>>>> backpressure >>>>>>>> // goes away before we will recalculate metrics and new >>>> checkpointing >>>>>>>> interval >>>>>>>> timeToConsumeBacklog = (pendingRecords - >>>>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond >>>>>>>> >>>>>>>> >>>>>>>> Then we can use those numbers to calculate desired checkpointed >>>>>> interval >>>>>>>> for example like this: >>>>>>>> >>>>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10; >>>> //this >>>>>> may >>>>>>>> need some refining >>>>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval, >>>>>>>> calculatedCheckpointInterval), slowCheckpointInterval); >>>>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval; >>>>>>>> >>>>>>>> WDYT? >>>>>>> >>>>>>> >>>>>>> I think the idea of the above algorithm is to incline to use the >>>>>>> fastCheckpointInterval unless we are very sure the backlog will take >>>> a >>>>>> long >>>>>>> time to process. This can alleviate the concern of regression during >>>>> the >>>>>>> continuous_bounded phase since we are more likely to use the >>>>>>> fastCheckpointInterval. However, it can cause regression during the >>>>>> bounded >>>>>>> phase. >>>>>>> >>>>>>> I will use a concrete example to explain the risk of regression: >>>>>>> - The user is using HybridSource to read from HDFS followed by Kafka. >>>>> The >>>>>>> data in HDFS is old and there is no need for data freshness for the >>>>> data >>>>>> in >>>>>>> HDFS. >>>>>>> - The user configures the job as below: >>>>>>> - fastCheckpointInterval = 3 minutes >>>>>>> - slowCheckpointInterval = 30 minutes >>>>>>> - metricsUpdateInterval = 100 ms >>>>>>> >>>>>>> Using the above formulate, we can know that once pendingRecords >>>>>>> <= numRecordsInPerSecond * 30-minutes, then >>>>> calculatedCheckpointInterval >>>>>> <= >>>>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the >>>>>>> checkpointing interval. Then in the last 30 minutes of the bounded >>>>> phase, >>>>>>> the checkpointing frequency will be 10X higher than what the user >>>>> wants. >>>>>>> >>>>>>> Also note that the same issue would also considerably limit the >>>>> benefits >>>>>> of >>>>>>> the algorithm. For example, during the continuous phase, the >>>> algorithm >>>>>> will >>>>>>> only be better than the approach in FLIP-309 when there is at least >>>>>>> 30-minutes worth of backlog in the source. >>>>>>> >>>>>>> Sure, having a slower checkpointing interval in this extreme case >>>>> (where >>>>>>> there is 30-minutes backlog in the continous-unbounded phase) is >>>> still >>>>>>> useful when this happens. But since this is the un-common case, and >>>> the >>>>>>> right solution is probably to do capacity planning to avoid this from >>>>>>> happening in the first place, I am not sure it is worth optimizing >>>> for >>>>>> this >>>>>>> case at the cost of regression in the bounded phase and the reduced >>>>>>> operational predictability for users (e.g. what checkpointing >>>> interval >>>>>>> should I expect at this stage of the job). >>>>>>> >>>>>>> I think the fundamental issue with this algorithm is that it is >>>> applied >>>>>> to >>>>>>> both the bounded phases and the continous_unbounded phases without >>>>>> knowing >>>>>>> which phase the job is running at. The only information it can access >>>>> is >>>>>>> the backlog. But two sources with the same amount of backlog do not >>>>>>> necessarily mean they have the same data freshness requirement. >>>>>>> >>>>>>> In this particular example, users know that the data in HDFS is very >>>>> old >>>>>>> and there is no need for data freshness. Users can express signals >>>> via >>>>>> the >>>>>>> per-source API proposed in the FLIP. This is why the current approach >>>>> in >>>>>>> FLIP-309 can be better in this case. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Best, >>>>>>> Dong >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Piotrek >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >>