Hi Piotr, Any suggestion on how we can practically move forward to address the target use-case?
My understanding is that the current proposal does not have any correctness/performance issues. And it allows the extension to support all the extra use-case without having to throw away the proposed APIs. If you prefer to have a better solution with simpler APIs and yet same or better correctness/performance for the target use-case, could you please kindly explain its API design so that we can continue the discussion? Best, Dong On Mon, Jul 3, 2023 at 6:39 PM Dong Lin <lindon...@gmail.com> wrote: > Hi Piotr, > > Please see my comments inline. > > On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > >> Hi Dong, >> >> Starting from the end: >> >> > It seems that the only benefit of this approach is to avoid" >> > adding SplitEnumeratorContext#setIsProcessingBacklog." >> >> Yes, that's the major benefit of this counter-proposal. >> >> > In the target use-case, user still want to do checkpoint (though at a" >> > larger interval) when there is backlog. And HybridSource need to know >> the" >> > expected checkpoint interval during backlog in order to determine >> whether" >> > it should keep throwing CheckpointException. Thus, we still need to add" >> > execution.checkpointing.interval-during-backlog for user to specify >> this" >> > information." >> > >> > The downside of this approach is that it is hard to enforce the" >> > semantics specified by execution.checkpointing.interval-during-backlog. >> For" >> > example, suppose execution.checkpointing.interval =3 minute and" >> > execution.checkpointing.interval-during-backlog = 7 minutes. During the" >> > backlog phase, checkpoint coordinator will still trigger the checkpoint" >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3" >> > checkpoint invocation, and the effective checkpoint interval will be 9" >> > minutes." >> >> Does it really matter what's the exact value of the longer interval? Can >> not we >> hard-code it to be 5x or 10x of the base checkpoint interval? If there is >> a >> notice >> able overhead from the base interval slowing down records processing rate, >> reducing this interval by a factor of 5x or 10x, would fix performance >> issue for >> vast majority of users. So a source could just skip 4 out of 5 or 9 out of >> 10 >> checkpoints. >> > > Yes, I think the exact value of the longer interval matters. > > The main reason we need two intervals is for jobs which have two-phase > commit sink. The short interval typically represents the interval that a > user can accept for the two-phase commit sink to buffer data (since it can > only emit data when checkpoint is triggered). And the long interval > typically represents the maximum amount of duplicate work (in terms of > time) that a job need to re-do after failover. > > Since there is no intrinsic relationship between the data buffer interval > (related to processing latency) and the failover boundary, I don't think we > can hardcode it to be 5x or 10x of the base checkpoint interval. > > >> Alternatively we could introduce a config option like: >> >> execution.checkpointing.long-interval >> >> that might be re-used in the future, with more fancy algorithms, but I >> don't see >> much value in doing that. > > >> > Overall, I think the solution is a bit hacky. I think it is preferred >> to" >> > throw exception only when there is indeed error. If we don't need to >> check" >> > a checkpoint, it is preferred to not trigger the checkpoint in the >> first" >> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog >> is" >> > probably not that much of a big deal." >> >> Yes it's hacky, but at least it doesn't require extending the Public API >> for a >> quite limited solution, that only targets one or two sources that are >> rarely used. >> > > I am not sure it is fair to say MySQL CDC source is "rarely used". > ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that > the proposed feature can be useful for CDC sources with an internal > "backlog phase". Its usage is not limited to just the two sources mentioned > in the FLIP. > > >> >> ================ >> >> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a >> feeling that >> this is overly complicated and would require every operator/function to >> handle that. >> >> Yes it would cover even more use cases, at the cost of complicating the >> system by >> a lot. IMO it looks like something we could do if there would indeed by a >> high >> demand of such a feature, after we provide some baseline generic solution, >> that >> doesn't require any configuration. >> >> I have a feeling that by just statically looking at the shape of the job >> graph and how >> it is connected, we could deduce almost the same things. >> > > Note that pretty much every FLIP will address my use-case at the cost > of complicating the system. I understand you have the feeling that this > complexity is not worthwhile. However, as we can see from the comments in > this thread and the votes in the voting thread, many > committers/developers/users actually welcome the feature introduced in this > FLIP. > > I am happy to work with you together to find a more generic and simpler > solution, as long as that solution can address the target use-case without > hurting user-experience. The alternative solution which you have mentioned > so far, unfortunately, still has drawbacks as mentioned earlier. > > >> Also: >> >> > - the FLIP suggests to use the long checkpointing interval as long as >> any subtask is processing the backlog. Are you sure that's the right call? >> What if other >> > sources are producing fresh records, and those fresh records are >> reaching sinks? It could happen either with disjoint JobGraph, >> embarrassing >> parallel >> > JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can >> slip using a not backpressured input channel through generally >> backpressured >> > keyBy exchange. How should we handle that? This problem I think will >> affect every solution, including my previously proposed generic one, but >> we >> should >> > discuss how to handle that as well. >> >> By this I didn't necessarily mean that we have to solve it right now. >> >> ================ >> >> > The moments above seem kind of "abstract". I am hoping to understand >> more >> > technical details behind these comments so that we can see how to >> address >> > the concern. >> >> Over the span of this discussion I think I have already explained many >> times what >> bothers me in the current proposal. >> > > If I understand you correctly, your concern is that the solution is not > generic, can not address the extra use-case you want, and is too > complicated. > > I think it is fair to say that no FLIP can be fully generic to address all > use-case, just like the fact that Flink is still not perfect and still need > FLIPs to improve its performance/usage. And whether an API is too > complicated really depends on whether there exists a better option. > > Fairly speaking, different people can have different opinions on whether a > proposal is generic and whether it is too complicated. At least from the > comments from other developers in this thread and in the voting thread, > many developers and users actually like this current proposal. I hope you > understand that your concerns mentioned above are subjective and not > unnecessarily shared by other developers. > > Honestly speaking, if we block this FLIP just because anyone thinks it can > be better (yet without any concrete proposal for making it better), I feel > it is not a good result to other developers and users who would like to > have this feature to address their existing pain points. > > I am wondering if you can be a bit more lenient, consider the opinion of > other developers (not just me) who have voted, and allow us to make > incremental progress even though you might find it not meeting your > expectations in its current form? > > >> > For example, even if a FLP does not address all use-case >> > (which is arguably true for every FLIP), its solution does not >> necessarily >> > need to be thrown away later as long as it is extensible >> >> That's my main point. I haven't yet seen how proposals from this FLIP, >> that >> could >> extend FLIP-309 to cover the generic use case and: >> - Would work out of the box, for all or majority of the properly >> implemented sources. >> > > I would argue that for the target use-case mentioned in the motivation > section, it is impossible to address the use-case without any code change > from the source, and still have the same stability as the current proposal. > The reason is that when the source do not have event-time, we can not > correctly derive whether the MySQL CDC source is in the snapshot/binlog > phase by just looking at the processing time related metrics or > backpressure. > > We have discussed this issue in detail in the earlier emails of this > thread. I also mentioned that I will add follow-up FLIPs to make use of the > event-time metrics and backpressure metrics to derive backlog status. But > that can not replace the capability for source to explicitly specify its > metrics. > > > >> - Would require zero or very minimal configuration/input from the user. >> Especially >> > > Note that the current proposal only requires the user to specify one extra > config, namely execution.checkpointing.interval-during-backlog. I don't > think we are able to reduce the extra config to be zero, due to the reason > explained above (i.e. separate interval for failover and data freshness). > Therefore, the configuration from user is already minimal. > > >> wouldn't require implementing some custom things in every source. >> - Could be made to work well enough in the (vast?) majority of the use >> cases. > > > I think you are talking about the case of delaying checkpoint interval > when backpressure is high etc. I would argue this is a use-case not > targeted by this FLIP and it can be addressed in a follow-up FLIP. > > >> > > >> > So we probably need to understand specifically why the proposed APIs >> would be thrown away. >> >> As I have mentioned many times why that's the case: >> 1. This solution is not generic enough >> 2. I can see solutions that wouldn't require modification of every source >> 3. They would have zero overlap with the interfaces extension from this >> FLIP >> > > > I have the feeling our discussion is kind of in a loop, where you ask for > a solution without any change to the source (so that it is generic), I > explain why I am not able to find such a solution and the drawback of your > proposed solution, and then you repeat the same ask and insist this is > possible. > > If you can find a solution that wouldn't require modification of every > source and still address the target use-case well, could you please kindly > rephrase your solution so that we can revisit it? > > I assume this solution would not require extra config from users, would > not cause the job to use long checkpoint interval due to random/short > traffic spikes, and would not cause the job to use the short interval when > the job is still reading backlog data. > > I would be happy to be proven wrong if you else can provide such a > solution without the aforementioned drawbacks. I just hope we don't block > the FLIP forever for a goal that no one can address. > > Best, > Dong > > >> >> Best, >> Piotrek >> >> sob., 1 lip 2023 o 17:01 Dong Lin <lindon...@gmail.com> napisał(a): >> >> > Hi Piotr, >> > >> > Thank you for providing further suggestions to help improve the API. >> Please >> > see my comments inline. >> > >> > On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org> >> > wrote: >> > >> > > Hey, >> > > >> > > Sorry for a late reply, I was OoO for a week. I have three things to >> > point >> > > out. >> > > >> > > 1. =============== >> > > >> > > The updated proposal is indeed better, but to be honest I still don't >> > like >> > > it, for mostly the same reasons that I have mentioned earlier: >> > > - only a partial solution, that doesn't address all use cases, so we >> > would >> > > need to throw it away sooner or later >> > > - I don't see and it hasn't been discussed how to make it work out of >> the >> > > box for all sources >> > > - somehow complicating API for people implementing Sources >> > > - it should work out of the box for most of the sources, or at least >> to >> > > have that potential in the future >> > > >> > >> > The moments above seem kind of "abstract". I am hoping to understand >> more >> > technical details behind these comments so that we can see how to >> address >> > the concern. For example, even if a FLP does not address all use-case >> > (which is arguably true for every FLIP), its solution does not >> necessarily >> > need to be thrown away later as long as it is extensible. So we probably >> > need to understand specifically why the proposed APIs would be thrown >> away. >> > >> > Similarly, we would need to understand if there is a better design to >> make >> > the API simpler and work out of the box etc. in order to decide how to >> > address these comments. >> > >> > >> > > On top of that: >> > > - the FLIP I think is missing how to hook up SplitEnumeratorContext >> and >> > > CheckpointCoordinator to pass "isProcessingBacklog" >> > > >> > >> > I think it can be passed via the following function chain: >> > - CheckpointCoordinator invokes >> > OperatorCoordinatorCheckpointContext#isProcessingBacklog (via >> > coordinatorsToCheckpoint) to get this information. >> > - OperatorCoordinatorHolder implements >> > OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns >> > OperatorCoordinator#isProcessingBacklog (via coordinator) >> > - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog >> and >> > returns SourceCoordinatorContext#isProcessingBacklog >> > - SourceCoordinatorContext will implement >> > SplitEnumeratorContext#setIsProcessingBacklog and stores the given >> > information in a variable. >> > >> > Note that it involves only internal API. We might be able to find a >> simpler >> > solution with less functions on the path. As long as the above solution >> > works without having any performance or correctness, I think maybe we >> > should focus on the public API design and discuss the implementation in >> the >> > PR review? >> > >> > - the FLIP suggests to use the long checkpointing interval as long as >> any >> > > subtask is processing the backlog. Are you sure that's the right call? >> > What >> > > if other >> > > sources are producing fresh records, and those fresh records are >> > reaching >> > > sinks? It could happen either with disjoint JobGraph, embarrassing >> > parallel >> > > JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records >> can >> > > slip using a not backpressured input channel through generally >> > > backpressured >> > > keyBy exchange. How should we handle that? This problem I think will >> > > affect every solution, including my previously proposed generic one, >> but >> > we >> > > should >> > > discuss how to handle that as well. >> > > >> > >> > Good question. Here is my plan to improve the solution in a follow-up >> FLIP: >> > >> > - Let every subtask of every source operator emit >> > RecordAttributes(isBacklog=..) >> > - Let every subtask of every operator handle the RecordAttributes >> received >> > from inputs and emit RecordAttributes to downstream operators. Flink >> > runtime can derive this information for every one-input operator. For an >> > operator with two inputs, if one input has isBacklog=true and the other >> has >> > isBacklog=false, the operator should determine the isBacklog for its >> output >> > records based on its semantics. >> > - If there exists a subtask of a two-phase commit operator with >> > isBacklog=false, the operator should let JM know so that the JM will use >> > the short checkpoint interval (for data freshness). Otherwise, JM will >> use >> > the long checkpoint interval. >> > >> > The above solution guarantees that, if every two-input operator has >> > explicitly specified their isBacklog based on the inputs' isBacklog, >> then >> > the JM will use the short checkpoint interval if and only if it is >> useful >> > for at least one subtask of one two-phase commit operator. >> > >> > Note that even the above solution might not be perfect. Suppose there >> > exists one subtask of the two-phase commit operator has isBacklog=false, >> > but every other subtasks of this operator has isBacklog=true, due to >> load >> > imbalance. In this case, it might be beneficial to use the long >> checkpoint >> > interval to improve the average data freshness for this operator. >> However, >> > as we get into more edge case, the solution will become more complicated >> > (e.g. providing more APIs for user to specify their intended strategy) >> and >> > there will be less additional benefits (because these scenarios are less >> > common). >> > >> > Also, note that we can support the solution described above without >> > throwing away any public API currently proposed in FLIP-309. More >> > specifically, we still >> > need execution.checkpointing.interval-during-backlog. Sources such as >> > HybridSource and MySQL CDC source can still use >> setIsProcessingBacklog() to >> > specify the backlog status. We just need to update >> setIsProcessingBacklog() >> > to emit RecordAttributes(isBacklog=..) upon invocation. >> > >> > I hope the above solution is reasonable and can address most of your >> > concerns. And I hope we can use FLIP-309 to solve a large chunk of the >> > existing problems in Flink 1.18 release and make further improvements in >> > followup FLIPs. What do you think? >> > >> > >> > >> > > 2. =============== >> > > >> > > Regarding the current proposal, there might be a way to make it >> actually >> > > somehow generic (but not pluggable). But it might require slightly >> > > different >> > > interfaces. We could keep the idea that >> SourceCoordinator/SplitEnumerator >> > > is responsible for switching between slow/fast processing modes. It >> could >> > > be >> > > implemented to achieve something like in the FLIP-309 proposal, but >> apart >> > > of that, the default behaviour would be a built in mechanism working >> like >> > > this: >> > > 1. Every SourceReaderBase checks its metrics and its state, to decide >> if >> > it >> > > considers itself as "processingBacklog" or "veryBackpressured". The >> base >> > > implementation could do it via a similar mechanism as I was >> proposing >> > > previously, via looking at the busy/backPressuredTimeMsPerSecond, >> > > pendingRecords and processing rate. >> > > 2. SourceReaderBase could send an event with >> > > "processingBacklog"/"veryBackpressured" state. >> > > 3. SourceCoordinator would collect those events, and decide what >> should >> > it >> > > do, whether it should switch whole source to the >> > > "processingBacklog"/"veryBackpressured" state or not. >> > > >> > That could provide eventually a generic solution that works fo every >> > > source that reports the required metrics. Each source implementation >> > could >> > > decide >> > > whether to use that default behaviour, or if maybe it's better to >> > override >> > > the default, or combine default with something custom (like >> > HybridSource). >> > > >> > > And as a first step, we could implement that mechanism only on the >> > > SourceCoordinator side, without events, without the default generic >> > > solution and use >> > > it in the HybridSource/MySQL CDC. >> > > >> > > This approach has some advantages compared to my previous proposal: >> > > + no need to tinker with metrics and pushing metrics from TMs to JM >> > > + somehow communicating this information via Events seems a bit >> cleaner >> > > to me and avoids problems with freshness of the metrics >> > > And some issues: >> > > - I don't know if it can be made pluggable in the future. If a user >> > could >> > > implement a custom `CheckpointTrigger` that would automatically work >> with >> > > all/most >> > > of the pre-existing sources? >> > > - I don't know if it can be expanded if needed in the future, to >> make >> > > decisions based on operators in the middle of a jobgraph. >> > > >> > >> > Thanks for the proposal. Overall, I agree it is valuable to be able to >> > determine the isProcessingBacklog based on the source reader metrics. >> > >> > I will probably suggest making the following changes upon your idea: >> > - Instead of letting the source reader send events to the source >> > coordinator, the source reader can emit RecordAttributes(isBacklog=..) >> as >> > described earlier. We will let two-phase commit operator to decide >> whether >> > they need the short checkpoint interval. >> > - We consider isProcessingBacklog=true when watermarkLag is larger than >> a >> > threshold. >> > >> > This is a nice addition. But I think we still need extra information >> from >> > user (e.g. the threshold whether the watermarkLag or >> > backPressuredTimeMsPerSecond is too high) with extra public APIs for >> this >> > feature to work reliably. This is because there is no default algorithm >> > that works in all cases without extra specification from users, due to >> the >> > issues around the default algorithm we discussed previously. >> > >> > Overall, I think the current proposal in FLIP-309 is a first step >> towards >> > addressing these problems. The API for source enumerator to explicitly >> set >> > isProcessingBacklog based on its status is useful even if we can support >> > metrics-based solutions. >> > >> > If that looks reasonable, can we agree to make incremental improvement >> and >> > work on the metrics-based solution in a followup FLIP? >> > >> > >> > > >> > > 3. =============== >> > > >> > > Independent of that, during some brainstorming between me, Chesnay and >> > > Stefan Richter, an idea popped up, that I think could be a counter >> > proposal >> > > as >> > > an intermediate solution that probably effectively works the same way >> as >> > > current FLIP-309. >> > > >> > > Inside a HybridSource, from it's SplitEnumerator#snapshotState method, >> > can >> > > not you throw an exception like >> > > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new >> > > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`? >> > > Actually we could also introduce a dedicated `CheckpointFailureReason` >> > for >> > > that purpose and handle it some special way in some places (like maybe >> > hide >> > > such rejected checkpoints from the REST API/WebUI). We could >> elaborate on >> > > this a bit more, but after a brief thinking I could see it actually >> > > working well >> > > enough without any public facing changes. But I might be wrong here. >> > > >> > > If this feature actually grabs traction, we could expand it to >> something >> > > more sophisticated available via a public API in the future. >> > > >> > >> > In the target use-case, user still want to do checkpoint (though at a >> > larger interval) when there is backlog. And HybridSource need to know >> the >> > expected checkpoint interval during backlog in order to determine >> whether >> > it should keep throwing CheckpointException. Thus, we still need to add >> > execution.checkpointing.interval-during-backlog for user to specify this >> > information. >> > >> > It seems that the only benefit of this approach is to avoid >> > adding SplitEnumeratorContext#setIsProcessingBacklog. >> > >> > The downside of this approach is that it is hard to enforce the >> > semantics specified by execution.checkpointing.interval-during-backlog. >> For >> > example, suppose execution.checkpointing.interval =3 minute and >> > execution.checkpointing.interval-during-backlog = 7 minutes. During the >> > backlog phase, checkpoint coordinator will still trigger the checkpoint >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3 >> > checkpoint invocation, and the effective checkpoint interval will be 9 >> > minutes. >> > >> > Overall, I think the solution is a bit hacky. I think it is preferred to >> > throw exception only when there is indeed error. If we don't need to >> check >> > a checkpoint, it is preferred to not trigger the checkpoint in the first >> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog >> is >> > probably not that much of a big deal. >> > >> > Thanks for all the comments. I am looking forward to your thoughts. >> > >> > Best, >> > Dong >> > >> > >> > > >> > > =============== >> > > >> > > Sorry for disturbing this FLIP discussion and voting. >> > > >> > > Best, >> > > Piotrek >> > > >> > > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com> >> > napisał(a): >> > > >> > > > Hi Dong, >> > > > >> > > > Thanks for your quick reply. I think this has truly solved our >> problem >> > > and >> > > > will enable us upgrade our existing jobs more seamless. >> > > > >> > > > Best, >> > > > Xiangyu >> > > > >> > > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道: >> > > > >> > > > > Hi Feng, >> > > > > >> > > > > Thanks for the feedback. Yes, you can configure the >> > > > > execution.checkpointing.interval-during-backlog to effectively >> > disable >> > > > > checkpoint during backlog. >> > > > > >> > > > > Prior to your comment, the FLIP allows users to do this by setting >> > the >> > > > > config value to something large (e.g. 365 day). After thinking >> about >> > > this >> > > > > more, we think it is more usable to allow users to achieve this >> goal >> > by >> > > > > setting the config value to 0. This is consistent with the >> existing >> > > > > behavior of execution.checkpointing.interval -- the checkpoint is >> > > > disabled >> > > > > if user set execution.checkpointing.interval to 0. >> > > > > >> > > > > We have updated the description of >> > > > > execution.checkpointing.interval-during-backlog >> > > > > to say the following: >> > > > > ... it is not null, the value must either be 0, which means the >> > > > checkpoint >> > > > > is disabled during backlog, or be larger than or equal to >> > > > > execution.checkpointing.interval. >> > > > > >> > > > > Does this address your need? >> > > > > >> > > > > Best, >> > > > > Dong >> > > > > >> > > > > >> > > > > >> > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu < >> xiangyu...@gmail.com> >> > > > wrote: >> > > > > >> > > > > > Hi Dong and Yunfeng, >> > > > > > >> > > > > > Thanks for the proposal, your flip sounds very useful from my >> > > > > perspective. >> > > > > > In our business, when we using hybrid source in production we >> also >> > > met >> > > > > the >> > > > > > problem described in your flip. >> > > > > > In our solution, we tend to skip making any checkpoints before >> all >> > > > batch >> > > > > > tasks have finished and resume the periodic checkpoint only in >> > > > streaming >> > > > > > phrase. Within this flip, we can solve our problem in a more >> > generic >> > > > way. >> > > > > > >> > > > > > However, I am wondering if we still want to skip making any >> > > checkpoints >> > > > > > during historical phrase, can we set this configuration >> > > > > > "execution.checkpointing.interval-during-backlog" equals "-1" to >> > > cover >> > > > > this >> > > > > > case? >> > > > > > >> > > > > > Best, >> > > > > > Xiangyu >> > > > > > >> > > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道: >> > > > > > >> > > > > > > Thanks for Dong and Yunfeng's work. >> > > > > > > >> > > > > > > The FLIP looks good to me. This new version is clearer to >> > > understand. >> > > > > > > >> > > > > > > Best, >> > > > > > > Hang >> > > > > > > >> > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道: >> > > > > > > >> > > > > > > > Thanks Jack, Jingsong, and Zhu for the review! >> > > > > > > > >> > > > > > > > Thanks Zhu for the suggestion. I have updated the >> configuration >> > > > name >> > > > > as >> > > > > > > > suggested. >> > > > > > > > >> > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com> >> > > wrote: >> > > > > > > > >> > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving >> > this >> > > > > > > > discussion. >> > > > > > > > > >> > > > > > > > > The new design looks generally good to me. Increasing the >> > > > > checkpoint >> > > > > > > > > interval when the job is processing backlogs is easier for >> > > users >> > > > to >> > > > > > > > > understand and can help in more scenarios. >> > > > > > > > > >> > > > > > > > > I have one comment about the new configuration. >> > > > > > > > > Naming the new configuration >> > > > > > > > > "execution.checkpointing.interval-during-backlog" would be >> > > better >> > > > > > > > > according to Flink config naming convention. >> > > > > > > > > It is also because that nested config keys should be >> avoided. >> > > See >> > > > > > > > > FLINK-29372 for more details. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Zhu >> > > > > > > > > >> > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二 >> 15:45写道: >> > > > > > > > > > >> > > > > > > > > > Looks good to me! >> > > > > > > > > > >> > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and >> > design. >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Jingsong >> > > > > > > > > > >> > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu < >> imj...@gmail.com> >> > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > Thank you Dong for driving this FLIP. >> > > > > > > > > > > >> > > > > > > > > > > The new design looks good to me! >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Jark >> > > > > > > > > > > >> > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道: >> > > > > > > > > > > > >> > > > > > > > > > > > Thank you Leonard for the review! >> > > > > > > > > > > > >> > > > > > > > > > > > Hi Piotr, do you have any comments on the latest >> > > proposal? >> > > > > > > > > > > > >> > > > > > > > > > > > I am wondering if it is OK to start the voting >> thread >> > > this >> > > > > > week. >> > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu < >> > > > > xbjt...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > >> Thanks Dong for driving this FLIP forward! >> > > > > > > > > > > >> >> > > > > > > > > > > >> Introducing `backlog status` concept for flink job >> > > makes >> > > > > > sense >> > > > > > > to >> > > > > > > > > me as >> > > > > > > > > > > >> following reasons: >> > > > > > > > > > > >> >> > > > > > > > > > > >> From concept/API design perspective, it’s more >> general >> > > and >> > > > > > > natural >> > > > > > > > > than >> > > > > > > > > > > >> above proposals as it can be used in HybridSource >> for >> > > > > bounded >> > > > > > > > > records, CDC >> > > > > > > > > > > >> Source for history snapshot and general sources >> like >> > > > > > KafkaSource >> > > > > > > > for >> > > > > > > > > > > >> historical messages. >> > > > > > > > > > > >> >> > > > > > > > > > > >> From user cases/requirements, I’ve seen many users >> > > > manually >> > > > > to >> > > > > > > set >> > > > > > > > > larger >> > > > > > > > > > > >> checkpoint interval during backfilling and then >> set a >> > > > > shorter >> > > > > > > > > checkpoint >> > > > > > > > > > > >> interval for real-time processing in their >> production >> > > > > > > environments >> > > > > > > > > as a >> > > > > > > > > > > >> flink application optimization. Now, the flink >> > framework >> > > > can >> > > > > > > make >> > > > > > > > > this >> > > > > > > > > > > >> optimization no longer require the user to set the >> > > > > checkpoint >> > > > > > > > > interval and >> > > > > > > > > > > >> restart the job multiple times. >> > > > > > > > > > > >> >> > > > > > > > > > > >> Following supporting using larger checkpoint for >> job >> > > under >> > > > > > > backlog >> > > > > > > > > status >> > > > > > > > > > > >> in current FLIP, we can explore supporting larger >> > > > > > > > > parallelism/memory/cpu >> > > > > > > > > > > >> for job under backlog status in the future. >> > > > > > > > > > > >> >> > > > > > > > > > > >> In short, the updated FLIP looks good to me. >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> Best, >> > > > > > > > > > > >> Leonard >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin < >> > > > > lindon...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> Hi Piotr, >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> Thanks again for proposing the isProcessingBacklog >> > > > concept. >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> After discussing with Becket Qin and thinking >> about >> > > this >> > > > > > more, >> > > > > > > I >> > > > > > > > > agree it >> > > > > > > > > > > >>> is a better idea to add a top-level concept to all >> > > source >> > > > > > > > > operators to >> > > > > > > > > > > >>> address the target use-case. >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> The main reason that changed my mind is that >> > > > > > > isProcessingBacklog >> > > > > > > > > can be >> > > > > > > > > > > >>> described as an inherent/nature attribute of every >> > > source >> > > > > > > > instance >> > > > > > > > > and >> > > > > > > > > > > >> its >> > > > > > > > > > > >>> semantics does not need to depend on any specific >> > > > > > checkpointing >> > > > > > > > > policy. >> > > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog >> > behavior >> > > > for >> > > > > > the >> > > > > > > > > sources we >> > > > > > > > > > > >>> have considered so far (e.g. HybridSource and >> MySQL >> > CDC >> > > > > > source) >> > > > > > > > > without >> > > > > > > > > > > >>> asking users to explicitly configure the >> per-source >> > > > > behavior, >> > > > > > > > which >> > > > > > > > > > > >> indeed >> > > > > > > > > > > >>> provides better user experience. >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> I have updated the FLIP based on the latest >> > > suggestions. >> > > > > The >> > > > > > > > > latest FLIP >> > > > > > > > > > > >> no >> > > > > > > > > > > >>> longer introduces per-source config that can be >> used >> > by >> > > > > > > > end-users. >> > > > > > > > > While >> > > > > > > > > > > >> I >> > > > > > > > > > > >>> agree with you that CheckpointTrigger can be a >> useful >> > > > > feature >> > > > > > > to >> > > > > > > > > address >> > > > > > > > > > > >>> additional use-cases, I am not sure it is >> necessary >> > for >> > > > the >> > > > > > > > > use-case >> > > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce >> > > > > > CheckpointTrigger >> > > > > > > > > separately >> > > > > > > > > > > >>> in another FLIP? >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> Can you help take another look at the updated >> FLIP? >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> Best, >> > > > > > > > > > > >>> Dong >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> >> > > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski < >> > > > > > > > > pnowoj...@apache.org> >> > > > > > > > > > > >>> wrote: >> > > > > > > > > > > >>> >> > > > > > > > > > > >>>> Hi Dong, >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask >> has >> > > 1% >> > > > > > chance >> > > > > > > > of >> > > > > > > > > being >> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random >> > > traffic >> > > > > > > spikes). >> > > > > > > > > Then at >> > > > > > > > > > > >>>> any >> > > > > > > > > > > >>>>> given time, the chance of the job >> > > > > > > > > > > >>>>> being considered not-backpressured = >> (1-0.01)^1000. >> > > > Since >> > > > > > we >> > > > > > > > > evaluate >> > > > > > > > > > > >> the >> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated >> > time >> > > > for >> > > > > > the >> > > > > > > > job >> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 >> / >> > > > > > > > > ((1-0.01)^1000) = >> > > > > > > > > > > >> 23163 >> > > > > > > > > > > >>>>> sec = 6.4 hours. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> This means that the job will effectively always >> use >> > > the >> > > > > > > longer >> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real >> > concern, >> > > > > > right? >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Sorry I don't understand where you are getting >> those >> > > > > numbers >> > > > > > > > from. >> > > > > > > > > > > >>>> Instead of trying to find loophole after >> loophole, >> > > could >> > > > > you >> > > > > > > try >> > > > > > > > > to >> > > > > > > > > > > >> think >> > > > > > > > > > > >>>> how a given loophole could be improved/solved? >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to >> know >> > the >> > > > > APIs >> > > > > > > due >> > > > > > > > > to the >> > > > > > > > > > > >>>>> following reasons. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Please propose something. I don't think it's >> needed. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 >> motivation >> > > > > > section, >> > > > > > > > > would the >> > > > > > > > > > > >>>> APIs >> > > > > > > > > > > >>>>> of this alternative approach be more or less >> > usable? >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Everything that you originally wanted to achieve >> in >> > > > > > FLIP-309, >> > > > > > > > you >> > > > > > > > > could >> > > > > > > > > > > >> do >> > > > > > > > > > > >>>> as well in my proposal. >> > > > > > > > > > > >>>> Vide my many mentions of the "hacky solution". >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra >> > use-case >> > > > > (e.g. >> > > > > > > > allow >> > > > > > > > > > > >>>>> checkpointing interval to change dynamically >> even >> > > > during >> > > > > > the >> > > > > > > > > unbounded >> > > > > > > > > > > >>>>> phase) as it claims? >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> I don't see why not. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs >> > currently >> > > > > > > proposed >> > > > > > > > in >> > > > > > > > > > > >>>> FLIP-309? >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Yes >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> For example, if the APIs of this alternative >> > approach >> > > > can >> > > > > > be >> > > > > > > > > decoupled >> > > > > > > > > > > >>>> from >> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it >> > > might >> > > > be >> > > > > > > > > reasonable to >> > > > > > > > > > > >>>>> work on this extra use-case with a more >> > > > > > advanced/complicated >> > > > > > > > > design >> > > > > > > > > > > >>>>> separately in a followup work. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> As I voiced my concerns previously, the current >> > design >> > > > of >> > > > > > > > > FLIP-309 would >> > > > > > > > > > > >>>> clog the public API and in the long run confuse >> the >> > > > users. >> > > > > > IMO >> > > > > > > > > It's >> > > > > > > > > > > >>>> addressing the >> > > > > > > > > > > >>>> problem in the wrong place. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following: >> > > > > > > > > > > >>>>> - Have all source operators emit a metric named >> > > > > > > > > "processingBacklog". >> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the >> > > > > checkpointing >> > > > > > > > > interval to >> > > > > > > > > > > >> be >> > > > > > > > > > > >>>>> used when any source is processing backlog". >> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog" >> > > periodically >> > > > > from >> > > > > > > all >> > > > > > > > > source >> > > > > > > > > > > >>>>> operators and uses the newly added config value >> as >> > > > > > > appropriate. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Yes. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> The challenge with this approach is that we >> need to >> > > > > define >> > > > > > > the >> > > > > > > > > > > >> semantics >> > > > > > > > > > > >>>> of >> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all >> source >> > > > > > operators >> > > > > > > > > > > >>>>> implement this metric. I am not sure we are >> able to >> > > do >> > > > > this >> > > > > > > yet >> > > > > > > > > without >> > > > > > > > > > > >>>>> having users explicitly provide this information >> > on a >> > > > > > > > per-source >> > > > > > > > > basis. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka >> source, >> > > > should >> > > > > it >> > > > > > > > emit >> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job >> > might >> > > > use >> > > > > > long >> > > > > > > > > > > >>>> checkpointing >> > > > > > > > > > > >>>>> interval even >> > > > > > > > > > > >>>>> if the job is asked to process data starting >> from >> > now >> > > > to >> > > > > > the >> > > > > > > > > next 1 >> > > > > > > > > > > >> hour. >> > > > > > > > > > > >>>>> If no, then the job might use the short >> > checkpointing >> > > > > > > interval >> > > > > > > > > > > >>>>> even if the job is asked to re-process data >> > starting >> > > > > from 7 >> > > > > > > > days >> > > > > > > > > ago. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Yes. The same can be said of your proposal. Your >> > > > proposal >> > > > > > has >> > > > > > > > the >> > > > > > > > > very >> > > > > > > > > > > >> same >> > > > > > > > > > > >>>> issues >> > > > > > > > > > > >>>> that every source would have to implement it >> > > > differently, >> > > > > > most >> > > > > > > > > sources >> > > > > > > > > > > >>>> would >> > > > > > > > > > > >>>> have no idea how to properly calculate the new >> > > requested >> > > > > > > > > checkpoint >> > > > > > > > > > > >>>> interval, >> > > > > > > > > > > >>>> for those that do know how to do that, user would >> > have >> > > > to >> > > > > > > > > configure >> > > > > > > > > > > >> every >> > > > > > > > > > > >>>> source >> > > > > > > > > > > >>>> individually and yet again we would end up with a >> > > > system, >> > > > > > that >> > > > > > > > > works >> > > > > > > > > > > >> only >> > > > > > > > > > > >>>> partially in >> > > > > > > > > > > >>>> some special use cases (HybridSource), that's >> > > confusing >> > > > > the >> > > > > > > > users >> > > > > > > > > even >> > > > > > > > > > > >>>> more. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> That's why I think the more generic solution, >> > working >> > > > > > > primarily >> > > > > > > > > on the >> > > > > > > > > > > >> same >> > > > > > > > > > > >>>> metrics that are used by various auto scaling >> > > solutions >> > > > > > (like >> > > > > > > > > Flink K8s >> > > > > > > > > > > >>>> operator's >> > > > > > > > > > > >>>> autosaler) would be better. The hacky solution I >> > > > proposed >> > > > > > to: >> > > > > > > > > > > >>>> 1. show you that the generic solution is simply a >> > > > superset >> > > > > > of >> > > > > > > > your >> > > > > > > > > > > >> proposal >> > > > > > > > > > > >>>> 2. if you are adamant that >> > > > busyness/backpressured/records >> > > > > > > > > processing >> > > > > > > > > > > >>>> rate/pending records >> > > > > > > > > > > >>>> metrics wouldn't cover your use case >> sufficiently >> > > (imo >> > > > > > they >> > > > > > > > > can), >> > > > > > > > > > > >> then >> > > > > > > > > > > >>>> you can very easily >> > > > > > > > > > > >>>> enhance this algorithm with using some hints >> from >> > > the >> > > > > > > sources. >> > > > > > > > > Like >> > > > > > > > > > > >>>> "processingBacklog==true" >> > > > > > > > > > > >>>> to short circuit the main algorithm, if >> > > > > > `processingBacklog` >> > > > > > > is >> > > > > > > > > > > >>>> available. >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> Best, >> > > > > > > > > > > >>>> Piotrek >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin < >> > > lindon...@gmail.com> >> > > > > > > > > napisał(a): >> > > > > > > > > > > >>>> >> > > > > > > > > > > >>>>> Hi again Piotr, >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Thank you for the reply. Please see my reply >> > inline. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski >> < >> > > > > > > > > > > >>>> piotr.nowoj...@gmail.com> >> > > > > > > > > > > >>>>> wrote: >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>> Hi again Dong, >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> I understand that JM will get the >> > > > backpressure-related >> > > > > > > > metrics >> > > > > > > > > every >> > > > > > > > > > > >>>>> time >> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST >> request >> > to >> > > > get >> > > > > > > these >> > > > > > > > > > > >>>> metrics. >> > > > > > > > > > > >>>>>> But >> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already >> > > always >> > > > > > > > > receiving the >> > > > > > > > > > > >>>>> REST >> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is >> no >> > > > human >> > > > > > > > manually >> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it >> > does, >> > > > > what >> > > > > > is >> > > > > > > > the >> > > > > > > > > > > >>>>> interval? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Good catch, I've thought that metrics are >> > > > pre-emptively >> > > > > > sent >> > > > > > > > to >> > > > > > > > > JM >> > > > > > > > > > > >>>> every >> > > > > > > > > > > >>>>> 10 >> > > > > > > > > > > >>>>>> seconds. >> > > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and >> that >> > > > would >> > > > > > > have >> > > > > > > > > to be >> > > > > > > > > > > >>>>>> improved. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already >> paying >> > > this >> > > > > much >> > > > > > > > > overhead >> > > > > > > > > > > >>>> just >> > > > > > > > > > > >>>>>> for >> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I >> > still >> > > > > doubt >> > > > > > > it >> > > > > > > > > is true. >> > > > > > > > > > > >>>>> Can >> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently >> > configured? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should >> add >> > > > extra >> > > > > > code >> > > > > > > > to >> > > > > > > > > invoke >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means >> we >> > > need >> > > > to >> > > > > > > > > considerably >> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where >> > the >> > > > > > overhead >> > > > > > > > > will >> > > > > > > > > > > >>>>> increase >> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may >> > pose >> > > > risk >> > > > > > to >> > > > > > > > the >> > > > > > > > > > > >>>>>> scalability >> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we >> should >> > do >> > > > > this. >> > > > > > > What >> > > > > > > > > do you >> > > > > > > > > > > >>>>>> think? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported >> > every >> > > > > > 100ms. >> > > > > > > I >> > > > > > > > > meant >> > > > > > > > > > > >>>> that >> > > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would >> > report >> > > (a >> > > > > > value >> > > > > > > > of) >> > > > > > > > > > > >>>>> 100ms/s." >> > > > > > > > > > > >>>>>> once per metric interval (10s?). >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask >> has >> > > 1% >> > > > > > chance >> > > > > > > > of >> > > > > > > > > being >> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random >> > > traffic >> > > > > > > spikes). >> > > > > > > > > Then at >> > > > > > > > > > > >>>> any >> > > > > > > > > > > >>>>> given time, the chance of the job >> > > > > > > > > > > >>>>> being considered not-backpressured = >> (1-0.01)^1000. >> > > > Since >> > > > > > we >> > > > > > > > > evaluate >> > > > > > > > > > > >> the >> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated >> > time >> > > > for >> > > > > > the >> > > > > > > > job >> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 >> / >> > > > > > > > > ((1-0.01)^1000) = >> > > > > > > > > > > >> 23163 >> > > > > > > > > > > >>>>> sec = 6.4 hours. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> This means that the job will effectively always >> use >> > > the >> > > > > > > longer >> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real >> > concern, >> > > > > > right? >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>>> - What is the interface of this >> > CheckpointTrigger? >> > > > For >> > > > > > > > > example, are >> > > > > > > > > > > >>>> we >> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context >> that it >> > > can >> > > > > use >> > > > > > > to >> > > > > > > > > fetch >> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us >> > > understand >> > > > > what >> > > > > > > > > information >> > > > > > > > > > > >>>>>> this >> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make >> > the >> > > > > > > checkpoint >> > > > > > > > > > > >>>> decision. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> I honestly don't think this is important at >> this >> > > stage >> > > > > of >> > > > > > > the >> > > > > > > > > > > >>>> discussion. >> > > > > > > > > > > >>>>>> It could have >> > > > > > > > > > > >>>>>> whatever interface we would deem to be best. >> > > Required >> > > > > > > things: >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> - access to at least a subset of metrics that >> the >> > > > given >> > > > > > > > > > > >>>>> `CheckpointTrigger` >> > > > > > > > > > > >>>>>> requests, >> > > > > > > > > > > >>>>>> for example via some registration mechanism, >> so we >> > > > don't >> > > > > > > have >> > > > > > > > to >> > > > > > > > > > > >>>> fetch >> > > > > > > > > > > >>>>>> all of the >> > > > > > > > > > > >>>>>> metrics all the time from TMs. >> > > > > > > > > > > >>>>>> - some way to influence >> `CheckpointCoordinator`. >> > > > Either >> > > > > > via >> > > > > > > > > manually >> > > > > > > > > > > >>>>>> triggering >> > > > > > > > > > > >>>>>> checkpoints, and/or ability to change the >> > > > checkpointing >> > > > > > > > > interval. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to >> know >> > the >> > > > > APIs >> > > > > > > due >> > > > > > > > > to the >> > > > > > > > > > > >>>>> following reasons. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> We would need to know the concrete APIs to gauge >> > the >> > > > > > > following: >> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 >> motivation >> > > > > > section, >> > > > > > > > > would the >> > > > > > > > > > > >>>> APIs >> > > > > > > > > > > >>>>> of this alternative approach be more or less >> > usable? >> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra >> > use-case >> > > > > (e.g. >> > > > > > > > allow >> > > > > > > > > > > >>>>> checkpointing interval to change dynamically >> even >> > > > during >> > > > > > the >> > > > > > > > > unbounded >> > > > > > > > > > > >>>>> phase) as it claims? >> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs >> > currently >> > > > > > > proposed >> > > > > > > > in >> > > > > > > > > > > >>>> FLIP-309? >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> For example, if the APIs of this alternative >> > approach >> > > > can >> > > > > > be >> > > > > > > > > decoupled >> > > > > > > > > > > >>>> from >> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it >> > > might >> > > > be >> > > > > > > > > reasonable to >> > > > > > > > > > > >>>>> work on this extra use-case with a more >> > > > > > advanced/complicated >> > > > > > > > > design >> > > > > > > > > > > >>>>> separately in a followup work. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For >> > > > example, >> > > > > > is >> > > > > > > it >> > > > > > > > > going >> > > > > > > > > > > >>>> to >> > > > > > > > > > > >>>>>> run >> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or >> is it >> > > > going >> > > > > > to >> > > > > > > > run >> > > > > > > > > on the >> > > > > > > > > > > >>>>> JM? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> IMO on the JM. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> - Are we going to provide a default >> > implementation >> > > of >> > > > > > this >> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the >> > > > > algorithm >> > > > > > > > > described >> > > > > > > > > > > >>>>> below, >> > > > > > > > > > > >>>>>>> or do we expect each source operator >> developer to >> > > > > > implement >> > > > > > > > > their own >> > > > > > > > > > > >>>>>>> CheckpointTrigger? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> As I mentioned before, I think we should >> provide >> > at >> > > > the >> > > > > > very >> > > > > > > > > least the >> > > > > > > > > > > >>>>>> implementation >> > > > > > > > > > > >>>>>> that replaces the current triggering mechanism >> > > > > (statically >> > > > > > > > > configured >> > > > > > > > > > > >>>>>> checkpointing interval) >> > > > > > > > > > > >>>>>> and it would be great to provide the >> backpressure >> > > > > > monitoring >> > > > > > > > > trigger >> > > > > > > > > > > >> as >> > > > > > > > > > > >>>>>> well. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I agree that if there is a good use-case that >> can >> > be >> > > > > > > addressed >> > > > > > > > > by the >> > > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is >> reasonable >> > > > > > > > > > > >>>>> to add CheckpointTrigger and replace the current >> > > > > triggering >> > > > > > > > > mechanism >> > > > > > > > > > > >>>> with >> > > > > > > > > > > >>>>> it. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I also agree that we will likely find such a >> > > use-case. >> > > > > For >> > > > > > > > > example, >> > > > > > > > > > > >>>> suppose >> > > > > > > > > > > >>>>> the source records have event timestamps, then >> it >> > is >> > > > > likely >> > > > > > > > > > > >>>>> that we can use the trigger to dynamically >> control >> > > the >> > > > > > > > > checkpointing >> > > > > > > > > > > >>>>> interval based on the difference between the >> > > watermark >> > > > > and >> > > > > > > > > current >> > > > > > > > > > > >> system >> > > > > > > > > > > >>>>> time. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> But I am not sure the addition of this >> > > > CheckpointTrigger >> > > > > > > should >> > > > > > > > > be >> > > > > > > > > > > >>>> coupled >> > > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled >> > probably >> > > > > > depends >> > > > > > > on >> > > > > > > > > the >> > > > > > > > > > > >>>>> concrete API design around CheckpointTrigger. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> If you would be adamant that the backpressure >> > > > monitoring >> > > > > > > > doesn't >> > > > > > > > > cover >> > > > > > > > > > > >>>> well >> > > > > > > > > > > >>>>>> enough your use case, I would be ok to provide >> the >> > > > hacky >> > > > > > > > > version that >> > > > > > > > > > > >> I >> > > > > > > > > > > >>>>>> also mentioned >> > > > > > > > > > > >>>>>> before: >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>> """ >> > > > > > > > > > > >>>>>> Especially that if my proposed algorithm >> wouldn't >> > > work >> > > > > > good >> > > > > > > > > enough, >> > > > > > > > > > > >>>> there >> > > > > > > > > > > >>>>>> is >> > > > > > > > > > > >>>>>> an obvious solution, that any source could add >> a >> > > > metric, >> > > > > > > like >> > > > > > > > > let say >> > > > > > > > > > > >>>>>> "processingBacklog: true/false", and the >> > > > > > `CheckpointTrigger` >> > > > > > > > > > > >>>>>> could use this as an override to always switch >> to >> > > the >> > > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need >> > it, >> > > > but >> > > > > > > that's >> > > > > > > > > always >> > > > > > > > > > > >>>> an >> > > > > > > > > > > >>>>>> option >> > > > > > > > > > > >>>>>> that would be basically equivalent to your >> > original >> > > > > > > proposal. >> > > > > > > > > > > >>>>>> """ >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following: >> > > > > > > > > > > >>>>> - Have all source operators emit a metric named >> > > > > > > > > "processingBacklog". >> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the >> > > > > checkpointing >> > > > > > > > > interval to >> > > > > > > > > > > >> be >> > > > > > > > > > > >>>>> used when any source is processing backlog". >> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog" >> > > periodically >> > > > > from >> > > > > > > all >> > > > > > > > > source >> > > > > > > > > > > >>>>> operators and uses the newly added config value >> as >> > > > > > > appropriate. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> The challenge with this approach is that we >> need to >> > > > > define >> > > > > > > the >> > > > > > > > > > > >> semantics >> > > > > > > > > > > >>>> of >> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all >> source >> > > > > > operators >> > > > > > > > > > > >>>>> implement this metric. I am not sure we are >> able to >> > > do >> > > > > this >> > > > > > > yet >> > > > > > > > > without >> > > > > > > > > > > >>>>> having users explicitly provide this information >> > on a >> > > > > > > > per-source >> > > > > > > > > basis. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka >> source, >> > > > should >> > > > > it >> > > > > > > > emit >> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job >> > might >> > > > use >> > > > > > long >> > > > > > > > > > > >>>> checkpointing >> > > > > > > > > > > >>>>> interval even >> > > > > > > > > > > >>>>> if the job is asked to process data starting >> from >> > now >> > > > to >> > > > > > the >> > > > > > > > > next 1 >> > > > > > > > > > > >> hour. >> > > > > > > > > > > >>>>> If no, then the job might use the short >> > checkpointing >> > > > > > > interval >> > > > > > > > > > > >>>>> even if the job is asked to re-process data >> > starting >> > > > > from 7 >> > > > > > > > days >> > > > > > > > > ago. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> - How can users specify the >> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? >> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the >> > > > > > CheckpointTrigger >> > > > > > > > that >> > > > > > > > > > > >>>>> end-users >> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval? >> > What >> > > > > would >> > > > > > > > that >> > > > > > > > > look >> > > > > > > > > > > >>>>> like? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Also as I mentioned before, just like metric >> > > reporters >> > > > > are >> > > > > > > > > configured: >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ >> > > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own >> custom >> > > > > > > > configuration. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative >> > > approach >> > > > > > based >> > > > > > > > on >> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Yes, as usual, more generic things are more >> > > > complicated, >> > > > > > but >> > > > > > > > > often >> > > > > > > > > > > >> more >> > > > > > > > > > > >>>>>> useful in the long run. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> and harder to use. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> I don't agree. Why setting in config >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> execution.checkpointing.trigger: >> > > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: >> > > > > > > > > > > >>>>>> 1s >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: >> > > > > > > > > > > >>>>>> 30s >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> that we could even provide a shortcut to the >> above >> > > > > > construct >> > > > > > > > > via: >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s >> > > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> is harder compared to setting two/three >> checkpoint >> > > > > > > intervals, >> > > > > > > > > one in >> > > > > > > > > > > >>>> the >> > > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`, >> > > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on >> the >> > > > > source >> > > > > > > > > itself? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> If we can address the use-case by providing just >> > the >> > > > two >> > > > > > > > > job-level >> > > > > > > > > > > >> config >> > > > > > > > > > > >>>>> as described above, I agree it will indeed be >> > > simpler. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I have tried to achieve this goal. But the >> caveat >> > is >> > > > that >> > > > > > it >> > > > > > > > > requires >> > > > > > > > > > > >>>> much >> > > > > > > > > > > >>>>> more work than described above in order to give >> the >> > > > > configs >> > > > > > > > > > > >> well-defined >> > > > > > > > > > > >>>>> semantics. So I find it simpler to just use the >> > > > approach >> > > > > in >> > > > > > > > > FLIP-309. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Let me explain my concern below. It will be >> great >> > if >> > > > you >> > > > > or >> > > > > > > > > someone >> > > > > > > > > > > >> else >> > > > > > > > > > > >>>>> can help provide a solution. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> 1) We need to clearly document when the >> > fast-interval >> > > > and >> > > > > > > > > slow-interval >> > > > > > > > > > > >>>>> will be used so that users can derive the >> expected >> > > > > behavior >> > > > > > > of >> > > > > > > > > the job >> > > > > > > > > > > >>>> and >> > > > > > > > > > > >>>>> be able to config these values. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on >> the >> > > > > > behavior >> > > > > > > of >> > > > > > > > > the >> > > > > > > > > > > >>>> source >> > > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no >> > existing >> > > > > > concepts >> > > > > > > > of >> > > > > > > > > source >> > > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the >> target >> > > > > > behavior. >> > > > > > > > For >> > > > > > > > > > > >>>> example, >> > > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely >> > snapshot >> > > > > phase >> > > > > > > and >> > > > > > > > > binlog >> > > > > > > > > > > >>>>> phase, which are not explicitly exposed to its >> > users >> > > > via >> > > > > > > source >> > > > > > > > > > > >> operator >> > > > > > > > > > > >>>>> API. And we probably should not enumerate all >> > > internal >> > > > > > phases >> > > > > > > > of >> > > > > > > > > all >> > > > > > > > > > > >>>> source >> > > > > > > > > > > >>>>> operators that are affected by fast/slow >> interval. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> 3) An alternative approach might be to define a >> new >> > > > > concept >> > > > > > > > (e.g. >> > > > > > > > > > > >>>>> processingBacklog) that is applied to all source >> > > > > operators. >> > > > > > > > Then >> > > > > > > > > the >> > > > > > > > > > > >>>>> fast/slow interval's documentation can depend on >> > this >> > > > > > > concept. >> > > > > > > > > That >> > > > > > > > > > > >> means >> > > > > > > > > > > >>>>> we have to add a top-level concept (similar to >> > source >> > > > > > > > > boundedness) and >> > > > > > > > > > > >>>>> require all source operators to specify how they >> > > > enforce >> > > > > > this >> > > > > > > > > concept >> > > > > > > > > > > >>>> (e.g. >> > > > > > > > > > > >>>>> FileSystemSource always emits >> > > processingBacklog=true). >> > > > > And >> > > > > > > > there >> > > > > > > > > might >> > > > > > > > > > > >> be >> > > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded >> Kafka >> > > > > Source) >> > > > > > > can >> > > > > > > > > not >> > > > > > > > > > > >>>>> automatically derive the value of this concept, >> in >> > > > which >> > > > > > case >> > > > > > > > we >> > > > > > > > > need >> > > > > > > > > > > >> to >> > > > > > > > > > > >>>>> provide option for users to explicitly specify >> the >> > > > value >> > > > > > for >> > > > > > > > this >> > > > > > > > > > > >> concept >> > > > > > > > > > > >>>>> on a per-source basis. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>>> And it probably also has the issues of "having >> > two >> > > > > places >> > > > > > > to >> > > > > > > > > > > >>>> configure >> > > > > > > > > > > >>>>>> checkpointing >> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every >> > source >> > > to >> > > > > > > > > implement a >> > > > > > > > > > > >>>>>> different >> > > > > > > > > > > >>>>>>> API" (as mentioned below). >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> No, it doesn't. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the >> user-facing >> > > API >> > > > > to >> > > > > > be >> > > > > > > > > > > >>>>>>> clearly defined and users should be able to >> use >> > the >> > > > API >> > > > > > > > without >> > > > > > > > > > > >>>> concern >> > > > > > > > > > > >>>>>> of >> > > > > > > > > > > >>>>>>> regression. And this requirement is more >> > important >> > > > than >> > > > > > the >> > > > > > > > > other >> > > > > > > > > > > >>>> goals >> > > > > > > > > > > >>>>>>> discussed above because it is related to the >> > > > > > > > > stability/performance of >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> production job. What do you think? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> I don't agree with this. There are many things >> > that >> > > > work >> > > > > > > > > something in >> > > > > > > > > > > >>>>>> between perfectly and well enough >> > > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%, >> maybe >> > > 95% >> > > > > or >> > > > > > > > maybe >> > > > > > > > > 60%), >> > > > > > > > > > > >>>>> while >> > > > > > > > > > > >>>>>> still being very useful. >> > > > > > > > > > > >>>>>> Good examples are: selection of state backend, >> > > > unaligned >> > > > > > > > > checkpoints, >> > > > > > > > > > > >>>>>> buffer debloating but frankly if I go >> > > > > > > > > > > >>>>>> through list of currently available config >> > options, >> > > > > > > something >> > > > > > > > > like >> > > > > > > > > > > >> half >> > > > > > > > > > > >>>>> of >> > > > > > > > > > > >>>>>> them can cause regressions. Heck, >> > > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in >> 100% >> > of >> > > > the >> > > > > > use >> > > > > > > > > cases, due >> > > > > > > > > > > >>>>> to a >> > > > > > > > > > > >>>>>> variety of design choices. Of >> > > > > > > > > > > >>>>>> course, the more use cases are fine with said >> > > feature, >> > > > > the >> > > > > > > > > better, but >> > > > > > > > > > > >>>> we >> > > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover >> > > > > > > > > > > >>>>>> 100% of the cases, as that's impossible. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> In this particular case, if back pressure >> > monitoring >> > > > > > > trigger >> > > > > > > > > can work >> > > > > > > > > > > >>>>> well >> > > > > > > > > > > >>>>>> enough in 95% of cases, I would >> > > > > > > > > > > >>>>>> say that's already better than the originally >> > > proposed >> > > > > > > > > alternative, >> > > > > > > > > > > >>>> which >> > > > > > > > > > > >>>>>> doesn't work at all if user has a large >> > > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including when >> > > using >> > > > > > > > > HybridSource >> > > > > > > > > > > >>>> AFTER >> > > > > > > > > > > >>>>>> the switch to Kafka has >> > > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try >> to >> > > > improve >> > > > > > the >> > > > > > > > > behaviour >> > > > > > > > > > > >>>>> over >> > > > > > > > > > > >>>>>> time, but ultimately, users can >> > > > > > > > > > > >>>>>> decide to just run a fixed checkpoint interval >> (or >> > > at >> > > > > > worst >> > > > > > > > use >> > > > > > > > > the >> > > > > > > > > > > >>>> hacky >> > > > > > > > > > > >>>>>> checkpoint trigger that I mentioned >> > > > > > > > > > > >>>>>> before a couple of times). >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Also to be pedantic, if a user naively selects >> > > > > > slow-interval >> > > > > > > > in >> > > > > > > > > your >> > > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's >> > > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his >> job >> > can >> > > > end >> > > > > > up >> > > > > > > in >> > > > > > > > > a state >> > > > > > > > > > > >>>>> that >> > > > > > > > > > > >>>>>> it can not make any progress, >> > > > > > > > > > > >>>>>> this arguably is quite serious regression. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I probably should not say it is "hard >> requirement". >> > > > After >> > > > > > all >> > > > > > > > > there are >> > > > > > > > > > > >>>>> pros/cons. We will need to consider >> implementation >> > > > > > > complexity, >> > > > > > > > > > > >> usability, >> > > > > > > > > > > >>>>> extensibility etc. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I just don't think we should take it for >> granted to >> > > > > > introduce >> > > > > > > > > > > >> regression >> > > > > > > > > > > >>>>> for one use-case in order to support another >> > > use-case. >> > > > If >> > > > > > we >> > > > > > > > can >> > > > > > > > > not >> > > > > > > > > > > >> find >> > > > > > > > > > > >>>>> an algorithm/solution that addresses >> > > > > > > > > > > >>>>> both use-case well, I hope we can be open to >> tackle >> > > > them >> > > > > > > > > separately so >> > > > > > > > > > > >>>> that >> > > > > > > > > > > >>>>> users can choose the option that best fits their >> > > needs. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> All things else being equal, I think it is >> > preferred >> > > > for >> > > > > > > > > user-facing >> > > > > > > > > > > >> API >> > > > > > > > > > > >>>> to >> > > > > > > > > > > >>>>> be clearly defined and let users should be able >> to >> > > use >> > > > > the >> > > > > > > API >> > > > > > > > > without >> > > > > > > > > > > >>>>> concern of regression. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Maybe we can list pros/cons for the alternative >> > > > > approaches >> > > > > > we >> > > > > > > > > have been >> > > > > > > > > > > >>>>> discussing and see choose the best approach. And >> > > maybe >> > > > we >> > > > > > > will >> > > > > > > > > end up >> > > > > > > > > > > >>>>> finding that use-case >> > > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled >> > > separately >> > > > > > from >> > > > > > > > the >> > > > > > > > > > > >> use-case >> > > > > > > > > > > >>>>> in FLIP-309. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if >> > > > > > > > > > > >>>>> backPressuredTimeMsPerSecond >> > > > > > > > > > > >>>>>> = >> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure >> = >> > > > > > > > > > > >>>> numRecordsInPerSecond / >> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the >> > above >> > > > > > > > algorithm. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Do you mean >> > "maxRecordsConsumedWithoutBackpressure >> > > = >> > > > > > > > > > > >>>>>> (numRecordsInPerSecond >> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * >> > > > > > > > > > > >>>> metricsUpdateInterval"? >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> It looks like there is indeed some mistake in >> my >> > > > > proposal >> > > > > > > > > above. Yours >> > > > > > > > > > > >>>>> look >> > > > > > > > > > > >>>>>> more correct, it probably >> > > > > > > > > > > >>>>>> still needs some safeguard/special handling if >> > > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950` >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> The only information it can access is the >> > backlog. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Again no. It can access whatever we want to >> > provide >> > > to >> > > > > it. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a >> matter >> > > of >> > > > > > > tweaking >> > > > > > > > > the >> > > > > > > > > > > >>>>>> parameters and the algorithm itself, >> > > > > > > > > > > >>>>>> and how much safety-net do we want to have. >> > > > Ultimately, >> > > > > > I'm >> > > > > > > > > pretty >> > > > > > > > > > > >> sure >> > > > > > > > > > > >>>>>> that's a (for 95-99% of cases) >> > > > > > > > > > > >>>>>> solvable problem. If not, there is always the >> > hacky >> > > > > > > solution, >> > > > > > > > > that >> > > > > > > > > > > >>>> could >> > > > > > > > > > > >>>>> be >> > > > > > > > > > > >>>>>> even integrated into this above >> > > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to >> always >> > > reach >> > > > > > > > > > > >> `slow-interval`. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the >> > > > checkpointing >> > > > > > > > > interval in >> > > > > > > > > > > >>>> your >> > > > > > > > > > > >>>>>> counter example. In most cases >> > > > > > > > > > > >>>>>> any interval above 1 minute would inflict >> pretty >> > > > > > negligible >> > > > > > > > > overheads, >> > > > > > > > > > > >>>> so >> > > > > > > > > > > >>>>>> all in all, I would doubt there is >> > > > > > > > > > > >>>>>> a significant benefit (in most cases) of >> > increasing >> > > 3 >> > > > > > minute >> > > > > > > > > > > >> checkpoint >> > > > > > > > > > > >>>>>> interval to anything more, let alone >> > > > > > > > > > > >>>>>> 30 minutes. >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I am not sure we should design the algorithm >> with >> > the >> > > > > > > > assumption >> > > > > > > > > that >> > > > > > > > > > > >> the >> > > > > > > > > > > >>>>> short checkpointing interval will always be >> higher >> > > > than 1 >> > > > > > > > minute >> > > > > > > > > etc. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> I agree the proposed algorithm can solve most >> cases >> > > > where >> > > > > > the >> > > > > > > > > resource >> > > > > > > > > > > >> is >> > > > > > > > > > > >>>>> sufficient and there is always no backlog in >> source >> > > > > > subtasks. >> > > > > > > > On >> > > > > > > > > the >> > > > > > > > > > > >>>> other >> > > > > > > > > > > >>>>> hand, what makes SRE >> > > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases >> > where >> > > > the >> > > > > > > > traffic >> > > > > > > > > is >> > > > > > > > > > > >> spiky >> > > > > > > > > > > >>>>> and the cluster is reaching its capacity limit. >> The >> > > > > ability >> > > > > > > to >> > > > > > > > > predict >> > > > > > > > > > > >>>> and >> > > > > > > > > > > >>>>> control Flink job's behavior (including >> > checkpointing >> > > > > > > interval) >> > > > > > > > > can >> > > > > > > > > > > >>>>> considerably reduce the burden of manging Flink >> > jobs. >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> Best, >> > > > > > > > > > > >>>>> Dong >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> Best, >> > > > > > > > > > > >>>>>> Piotrek >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin < >> > > > lindon...@gmail.com> >> > > > > > > > > napisał(a): >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>>>> Hi Piotr, >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Thanks for the explanations. I have some >> followup >> > > > > > questions >> > > > > > > > > below. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr >> Nowojski < >> > > > > > > > > pnowoj...@apache.org >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>>>>> wrote: >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> Hi All, >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed! >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm >> > > leaning >> > > > > > > towards >> > > > > > > > > > > >>>>>> implementing >> > > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via >> some >> > > > > pluggable >> > > > > > > > > interface. >> > > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I >> guess >> > > not >> > > > > > > > everyone >> > > > > > > > > is >> > > > > > > > > > > >>>>> using >> > > > > > > > > > > >>>>>>>> Flink Kubernetes Operator. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> @Dong >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>>> I am not sure metrics such as >> isBackPressured >> > are >> > > > > > already >> > > > > > > > > sent to >> > > > > > > > > > > >>>>> JM. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Fetching code path on the JM: >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > >> > > > > >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Example code path accessing Task level >> metrics >> > via >> > > > JM >> > > > > > > using >> > > > > > > > > the >> > > > > > > > > > > >>>>>>>> `MetricStore`: >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the >> code >> > > > that >> > > > > > > > invoked >> > > > > > > > > these >> > > > > > > > > > > >>>>> two >> > > > > > > > > > > >>>>>>> classes and found the following information: >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> - >> AggregatingSubtasksMetricsHandler#getStoresis >> > > > > currently >> > > > > > > > > invoked >> > > > > > > > > > > >>>> only >> > > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked. >> > > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only >> > > instantiated >> > > > > and >> > > > > > > > > returned by >> > > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers >> > > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is >> only >> > > used >> > > > by >> > > > > > > > > > > >>>>>> RestServerEndpoint. >> > > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers >> in >> > > > > response >> > > > > > > to >> > > > > > > > > external >> > > > > > > > > > > >>>>>> REST >> > > > > > > > > > > >>>>>>> request. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I understand that JM will get the >> > > > backpressure-related >> > > > > > > > metrics >> > > > > > > > > every >> > > > > > > > > > > >>>>> time >> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST >> request >> > to >> > > > get >> > > > > > > these >> > > > > > > > > > > >>>> metrics. >> > > > > > > > > > > >>>>>> But >> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already >> > > always >> > > > > > > > > receiving the >> > > > > > > > > > > >>>>> REST >> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is >> no >> > > > human >> > > > > > > > manually >> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it >> > does, >> > > > > what >> > > > > > is >> > > > > > > > the >> > > > > > > > > > > >>>>> interval? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>>> For example, let's say every source operator >> > > > subtask >> > > > > > > > reports >> > > > > > > > > this >> > > > > > > > > > > >>>>>>> metric >> > > > > > > > > > > >>>>>>>> to >> > > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100 >> source >> > > > > > subtasks. >> > > > > > > > And >> > > > > > > > > each >> > > > > > > > > > > >>>>>>> subtask >> > > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total >> time >> > > due >> > > > to >> > > > > > > > traffic >> > > > > > > > > > > >>>>> spikes >> > > > > > > > > > > >>>>>>> (and >> > > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time, >> there >> > > are >> > > > 1 >> > > > > - >> > > > > > > > > 0.9^100 = >> > > > > > > > > > > >>>>>>> 99.997% >> > > > > > > > > > > >>>>>>>>> chance that there is at least one subtask >> that >> > is >> > > > > > > > > backpressured. >> > > > > > > > > > > >>>>> Then >> > > > > > > > > > > >>>>>>> we >> > > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to >> check >> > > > again. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other >> related >> > > > metrics >> > > > > > > (like >> > > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that >> > > > problem. >> > > > > > > > > > > >>>>>>>> They are recalculated once every metric >> fetching >> > > > > > interval, >> > > > > > > > > and they >> > > > > > > > > > > >>>>>>> report >> > > > > > > > > > > >>>>>>>> accurately on average the given subtask spent >> > > > > > > > > > > >>>>>> busy/idling/backpressured. >> > > > > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond >> > > would >> > > > > > report >> > > > > > > > > 100ms/s. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Suppose every subtask is already reporting >> > > > > > > > > > > >>>> backPressuredTimeMsPerSecond >> > > > > > > > > > > >>>>>> to >> > > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10 >> operators >> > > (that >> > > > > are >> > > > > > > not >> > > > > > > > > > > >>>> chained) >> > > > > > > > > > > >>>>>> and >> > > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would >> > need >> > > to >> > > > > > > handle >> > > > > > > > > 10000 >> > > > > > > > > > > >>>>>> requests >> > > > > > > > > > > >>>>>>> per second to receive metrics from these 1000 >> > > > subtasks. >> > > > > > It >> > > > > > > > > seems >> > > > > > > > > > > >>>> like a >> > > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized >> > jobs >> > > > and >> > > > > > can >> > > > > > > > > make JM >> > > > > > > > > > > >>>> the >> > > > > > > > > > > >>>>>>> performance bottleneck during job execution. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already >> paying >> > > this >> > > > > much >> > > > > > > > > overhead >> > > > > > > > > > > >>>> just >> > > > > > > > > > > >>>>>> for >> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I >> > still >> > > > > doubt >> > > > > > > it >> > > > > > > > > is true. >> > > > > > > > > > > >>>>> Can >> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently >> > configured? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should >> add >> > > > extra >> > > > > > code >> > > > > > > > to >> > > > > > > > > invoke >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means >> we >> > > need >> > > > to >> > > > > > > > > considerably >> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where >> > the >> > > > > > overhead >> > > > > > > > > will >> > > > > > > > > > > >>>>> increase >> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may >> > pose >> > > > risk >> > > > > > to >> > > > > > > > the >> > > > > > > > > > > >>>>>> scalability >> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we >> should >> > do >> > > > > this. >> > > > > > > What >> > > > > > > > > do you >> > > > > > > > > > > >>>>>> think? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>>> While it will be nice to support additional >> > > > use-cases >> > > > > > > > > > > >>>>>>>>> with one proposal, it is probably also >> > reasonable >> > > > to >> > > > > > make >> > > > > > > > > > > >>>>> incremental >> > > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit >> > > use-case >> > > > > > > first. >> > > > > > > > > The >> > > > > > > > > > > >>>>> choice >> > > > > > > > > > > >>>>>>>>> really depends on the complexity and the >> > > importance >> > > > > of >> > > > > > > > > supporting >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>>> extra >> > > > > > > > > > > >>>>>>>>> use-cases. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> That would be true, if that was a private >> > > > > implementation >> > > > > > > > > detail or >> > > > > > > > > > > >>>> if >> > > > > > > > > > > >>>>>> the >> > > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the >> > direct >> > > > path >> > > > > > to >> > > > > > > > the >> > > > > > > > > final >> > > > > > > > > > > >>>>>>>> solution. >> > > > > > > > > > > >>>>>>>> That's unfortunately not the case here. This >> > will >> > > > add >> > > > > > > public >> > > > > > > > > facing >> > > > > > > > > > > >>>>>> API, >> > > > > > > > > > > >>>>>>>> that we will later need to maintain, no >> matter >> > > what >> > > > > the >> > > > > > > > final >> > > > > > > > > > > >>>>> solution >> > > > > > > > > > > >>>>>>> will >> > > > > > > > > > > >>>>>>>> be, >> > > > > > > > > > > >>>>>>>> and at the moment at least I don't see it >> being >> > > > > related >> > > > > > > to a >> > > > > > > > > > > >>>>> "perfect" >> > > > > > > > > > > >>>>>>>> solution. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution >> first. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>>> I guess the point is that the suggested >> > approach, >> > > > > which >> > > > > > > > > > > >>>> dynamically >> > > > > > > > > > > >>>>>>>>> determines the checkpointing interval based >> on >> > > the >> > > > > > > > > backpressure, >> > > > > > > > > > > >>>>> may >> > > > > > > > > > > >>>>>>>> cause >> > > > > > > > > > > >>>>>>>>> regression when the checkpointing interval >> is >> > > > > > relatively >> > > > > > > > low. >> > > > > > > > > > > >>>> This >> > > > > > > > > > > >>>>>>> makes >> > > > > > > > > > > >>>>>>>> it >> > > > > > > > > > > >>>>>>>>> hard for users to enable this feature in >> > > > production. >> > > > > It >> > > > > > > is >> > > > > > > > > like >> > > > > > > > > > > >>>> an >> > > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed >> to >> > > work >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that >> would >> > > > > require >> > > > > > > > less >> > > > > > > > > > > >>>>>>> configuration >> > > > > > > > > > > >>>>>>>> is usually more difficult then static >> > > > configurations. >> > > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that. >> > > > > Especially >> > > > > > > that >> > > > > > > > > if my >> > > > > > > > > > > >>>>>>> proposed >> > > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is >> > > > > > > > > > > >>>>>>>> an obvious solution, that any source could >> add a >> > > > > metric, >> > > > > > > > like >> > > > > > > > > let >> > > > > > > > > > > >>>> say >> > > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the >> > > > > > > `CheckpointTrigger` >> > > > > > > > > > > >>>>>>>> could use this as an override to always >> switch >> > to >> > > > the >> > > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we >> need >> > > it, >> > > > > but >> > > > > > > > that's >> > > > > > > > > > > >>>> always >> > > > > > > > > > > >>>>>> an >> > > > > > > > > > > >>>>>>>> option >> > > > > > > > > > > >>>>>>>> that would be basically equivalent to your >> > > original >> > > > > > > > proposal. >> > > > > > > > > Or >> > > > > > > > > > > >>>> even >> > > > > > > > > > > >>>>>>>> source could add >> "suggestedCheckpointInterval : >> > > > int", >> > > > > > and >> > > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if >> > > present >> > > > > as a >> > > > > > > > hint >> > > > > > > > > in >> > > > > > > > > > > >>>> one >> > > > > > > > > > > >>>>>> way >> > > > > > > > > > > >>>>>>> or >> > > > > > > > > > > >>>>>>>> another. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> So far we have talked about the possibility of >> > > using >> > > > > > > > > > > >>>> CheckpointTrigger >> > > > > > > > > > > >>>>>> and >> > > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger >> > > > > > > > > > > >>>>>>> and read metric values. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Can you help answer the following questions so >> > > that I >> > > > > can >> > > > > > > > > understand >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> alternative solution more concretely: >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> - What is the interface of this >> > CheckpointTrigger? >> > > > For >> > > > > > > > > example, are >> > > > > > > > > > > >>>> we >> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context >> that it >> > > can >> > > > > use >> > > > > > > to >> > > > > > > > > fetch >> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us >> > > understand >> > > > > what >> > > > > > > > > information >> > > > > > > > > > > >>>>>> this >> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make >> > the >> > > > > > > checkpoint >> > > > > > > > > > > >>>> decision. >> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For >> > > > example, >> > > > > > is >> > > > > > > it >> > > > > > > > > going >> > > > > > > > > > > >>>> to >> > > > > > > > > > > >>>>>> run >> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or >> is it >> > > > going >> > > > > > to >> > > > > > > > run >> > > > > > > > > on the >> > > > > > > > > > > >>>>> JM? >> > > > > > > > > > > >>>>>>> - Are we going to provide a default >> > implementation >> > > of >> > > > > > this >> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the >> > > > > algorithm >> > > > > > > > > described >> > > > > > > > > > > >>>>> below, >> > > > > > > > > > > >>>>>>> or do we expect each source operator >> developer to >> > > > > > implement >> > > > > > > > > their own >> > > > > > > > > > > >>>>>>> CheckpointTrigger? >> > > > > > > > > > > >>>>>>> - How can users specify the >> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? >> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the >> > > > > > CheckpointTrigger >> > > > > > > > that >> > > > > > > > > > > >>>>> end-users >> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval? >> > What >> > > > > would >> > > > > > > > that >> > > > > > > > > look >> > > > > > > > > > > >>>>> like? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative >> > > approach >> > > > > > based >> > > > > > > > on >> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and >> harder >> > to >> > > > > use. >> > > > > > > And >> > > > > > > > it >> > > > > > > > > > > >>>>> probably >> > > > > > > > > > > >>>>>>> also has the issues of "having two places to >> > > > configure >> > > > > > > > > checkpointing >> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every >> > source >> > > to >> > > > > > > > > implement a >> > > > > > > > > > > >>>>>> different >> > > > > > > > > > > >>>>>>> API" (as mentioned below). >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing >> the >> > > > answers >> > > > > > to >> > > > > > > > the >> > > > > > > > > above >> > > > > > > > > > > >>>>>>> questions. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>>> On the other hand, the approach currently >> > > proposed >> > > > in >> > > > > > the >> > > > > > > > > FLIP is >> > > > > > > > > > > >>>>>> much >> > > > > > > > > > > >>>>>>>>> simpler as it does not depend on >> backpressure. >> > > > Users >> > > > > > > > specify >> > > > > > > > > the >> > > > > > > > > > > >>>>>> extra >> > > > > > > > > > > >>>>>>>>> interval requirement on the specific sources >> > > (e.g. >> > > > > > > > > HybridSource, >> > > > > > > > > > > >>>>>> MySQL >> > > > > > > > > > > >>>>>>>> CDC >> > > > > > > > > > > >>>>>>>>> Source) and can easily know the >> checkpointing >> > > > > interval >> > > > > > > will >> > > > > > > > > be >> > > > > > > > > > > >>>> used >> > > > > > > > > > > >>>>>> on >> > > > > > > > > > > >>>>>>>> the >> > > > > > > > > > > >>>>>>>>> continuous phase of the corresponding >> source. >> > > This >> > > > is >> > > > > > > > pretty >> > > > > > > > > much >> > > > > > > > > > > >>>>>> same >> > > > > > > > > > > >>>>>>> as >> > > > > > > > > > > >>>>>>>>> how users use the existing >> > > > > > > execution.checkpointing.interval >> > > > > > > > > > > >>>> config. >> > > > > > > > > > > >>>>>> So >> > > > > > > > > > > >>>>>>>>> there is no extra concern of regression >> caused >> > by >> > > > > this >> > > > > > > > > approach. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> To an extent, but as I have already >> previously >> > > > > > mentioned I >> > > > > > > > > really >> > > > > > > > > > > >>>>>> really >> > > > > > > > > > > >>>>>>> do >> > > > > > > > > > > >>>>>>>> not like idea of: >> > > > > > > > > > > >>>>>>>> - having two places to configure >> checkpointing >> > > > > interval >> > > > > > > > > (config >> > > > > > > > > > > >>>>> file >> > > > > > > > > > > >>>>>>> and >> > > > > > > > > > > >>>>>>>> in the Source builders) >> > > > > > > > > > > >>>>>>>> - giving flexibility for every source to >> > > implement a >> > > > > > > > different >> > > > > > > > > > > >>>> API >> > > > > > > > > > > >>>>>> for >> > > > > > > > > > > >>>>>>>> that purpose >> > > > > > > > > > > >>>>>>>> - creating a solution that is not generic >> > enough, >> > > so >> > > > > > that >> > > > > > > we >> > > > > > > > > will >> > > > > > > > > > > >>>>>> need >> > > > > > > > > > > >>>>>>> a >> > > > > > > > > > > >>>>>>>> completely different mechanism in the future >> > > anyway >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Yeah, I understand different developers might >> > have >> > > > > > > different >> > > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately, >> there >> > > > might >> > > > > > not >> > > > > > > > be >> > > > > > > > > a >> > > > > > > > > > > >>>>> perfect >> > > > > > > > > > > >>>>>>> solution and we have to choose based on the >> > > pros/cons >> > > > > of >> > > > > > > > these >> > > > > > > > > > > >>>>> solutions. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I agree with you that, all things being >> equal, it >> > > is >> > > > > > > > > preferable to 1) >> > > > > > > > > > > >>>>>> have >> > > > > > > > > > > >>>>>>> one place to configure checkpointing >> intervals, >> > 2) >> > > > have >> > > > > > all >> > > > > > > > > source >> > > > > > > > > > > >>>>>>> operators use the same API, and 3) create a >> > > solution >> > > > > that >> > > > > > > is >> > > > > > > > > generic >> > > > > > > > > > > >>>>> and >> > > > > > > > > > > >>>>>>> last lasting. Note that these three goals >> affects >> > > the >> > > > > > > > > usability and >> > > > > > > > > > > >>>>>>> extensibility of the API, but not necessarily >> the >> > > > > > > > > > > >>>> stability/performance >> > > > > > > > > > > >>>>>> of >> > > > > > > > > > > >>>>>>> the production job. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals. >> For >> > > > > example, >> > > > > > > it >> > > > > > > > > is very >> > > > > > > > > > > >>>>>> useful >> > > > > > > > > > > >>>>>>> for the job's behavior to be predictable and >> > > > > > interpretable >> > > > > > > so >> > > > > > > > > that >> > > > > > > > > > > >>>> SRE >> > > > > > > > > > > >>>>>> can >> > > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We >> can >> > > > list >> > > > > > > these >> > > > > > > > > > > >>>> pros/cons >> > > > > > > > > > > >>>>>>> altogether later. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I am wondering if we can first agree on the >> > > priority >> > > > of >> > > > > > > goals >> > > > > > > > > we want >> > > > > > > > > > > >>>>> to >> > > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the >> > > > > > user-facing >> > > > > > > > API >> > > > > > > > > to be >> > > > > > > > > > > >>>>>>> clearly defined and users should be able to >> use >> > the >> > > > API >> > > > > > > > without >> > > > > > > > > > > >>>> concern >> > > > > > > > > > > >>>>>> of >> > > > > > > > > > > >>>>>>> regression. And this requirement is more >> > important >> > > > than >> > > > > > the >> > > > > > > > > other >> > > > > > > > > > > >>>> goals >> > > > > > > > > > > >>>>>>> discussed above because it is related to the >> > > > > > > > > stability/performance of >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> production job. What do you think? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning >> more >> > > > ideas. >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I >> > think >> > > we >> > > > > > don't >> > > > > > > > > need to >> > > > > > > > > > > >>>>>> check >> > > > > > > > > > > >>>>>>>> for the backpressure status, or how much >> > > overloaded >> > > > > all >> > > > > > of >> > > > > > > > the >> > > > > > > > > > > >>>>>> operators >> > > > > > > > > > > >>>>>>>> are. >> > > > > > > > > > > >>>>>>>> We could just check three things for source >> > > > operators: >> > > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length) >> > > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond >> > > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s // >> obtained >> > > from >> > > > > > config >> > > > > > > > > > > >>>>>>>> // Next line calculates how many records can >> we >> > > > > consume >> > > > > > > from >> > > > > > > > > the >> > > > > > > > > > > >>>>>> backlog, >> > > > > > > > > > > >>>>>>>> assuming >> > > > > > > > > > > >>>>>>>> // that magically the reason behind a >> > backpressure >> > > > > > > vanishes. >> > > > > > > > > We >> > > > > > > > > > > >>>> will >> > > > > > > > > > > >>>>>> use >> > > > > > > > > > > >>>>>>>> this only as >> > > > > > > > > > > >>>>>>>> // a safeguard against scenarios like for >> > example >> > > > if >> > > > > > > > > backpressure >> > > > > > > > > > > >>>>> was >> > > > > > > > > > > >>>>>>>> caused by some >> > > > > > > > > > > >>>>>>>> // intermittent failure/performance >> degradation. >> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure = >> > > > > > > > > (numRecordsInPerSecond / >> > > > > > > > > > > >>>>> (1000 >> > > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * >> > > > > > > > > metricsUpdateInterval >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if >> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond = >> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure >> = >> > > > > > > > > > > >>>> numRecordsInPerSecond / >> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the >> > above >> > > > > > > > algorithm. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Do you mean >> > "maxRecordsConsumedWithoutBackpressure >> > > = >> > > > > > > > > > > >>>>>> (numRecordsInPerSecond >> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * >> > > > > > > > > > > >>>> metricsUpdateInterval"? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> // we are excluding >> > > > > > maxRecordsConsumedWithoutBackpressure >> > > > > > > > > from the >> > > > > > > > > > > >>>>>>> backlog >> > > > > > > > > > > >>>>>>>> as >> > > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back >> > > pressure >> > > > > > > > > problems, so >> > > > > > > > > > > >>>>> that >> > > > > > > > > > > >>>>>> we >> > > > > > > > > > > >>>>>>>> don't >> > > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far >> far in >> > > the >> > > > > > > future, >> > > > > > > > > while >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>>> backpressure >> > > > > > > > > > > >>>>>>>> // goes away before we will recalculate >> metrics >> > > and >> > > > > new >> > > > > > > > > > > >>>> checkpointing >> > > > > > > > > > > >>>>>>>> interval >> > > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords - >> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) / >> > > > > > > > numRecordsInPerSecond >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate >> > desired >> > > > > > > > > checkpointed >> > > > > > > > > > > >>>>>> interval >> > > > > > > > > > > >>>>>>>> for example like this: >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval = >> > > > > > timeToConsumeBacklog / >> > > > > > > > 10; >> > > > > > > > > > > >>>> //this >> > > > > > > > > > > >>>>>> may >> > > > > > > > > > > >>>>>>>> need some refining >> > > > > > > > > > > >>>>>>>> long nextCheckpointInterval = >> > > > > > > > min(max(fastCheckpointInterval, >> > > > > > > > > > > >>>>>>>> calculatedCheckpointInterval), >> > > > > slowCheckpointInterval); >> > > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs + >> > > > > > > > > nextCheckpointInterval; >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> WDYT? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I think the idea of the above algorithm is to >> > > incline >> > > > > to >> > > > > > > use >> > > > > > > > > the >> > > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure >> > the >> > > > > > backlog >> > > > > > > > > will take >> > > > > > > > > > > >>>> a >> > > > > > > > > > > >>>>>> long >> > > > > > > > > > > >>>>>>> time to process. This can alleviate the >> concern >> > of >> > > > > > > regression >> > > > > > > > > during >> > > > > > > > > > > >>>>> the >> > > > > > > > > > > >>>>>>> continuous_bounded phase since we are more >> likely >> > > to >> > > > > use >> > > > > > > the >> > > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause >> > > > > regression >> > > > > > > > > during the >> > > > > > > > > > > >>>>>> bounded >> > > > > > > > > > > >>>>>>> phase. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I will use a concrete example to explain the >> risk >> > > of >> > > > > > > > > regression: >> > > > > > > > > > > >>>>>>> - The user is using HybridSource to read from >> > HDFS >> > > > > > followed >> > > > > > > > by >> > > > > > > > > Kafka. >> > > > > > > > > > > >>>>> The >> > > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for >> data >> > > > > > freshness >> > > > > > > > > for the >> > > > > > > > > > > >>>>> data >> > > > > > > > > > > >>>>>> in >> > > > > > > > > > > >>>>>>> HDFS. >> > > > > > > > > > > >>>>>>> - The user configures the job as below: >> > > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes >> > > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes >> > > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Using the above formulate, we can know that >> once >> > > > > > > > pendingRecords >> > > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then >> > > > > > > > > > > >>>>> calculatedCheckpointInterval >> > > > > > > > > > > >>>>>> <= >> > > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use >> > > > > > slowCheckpointInterval >> > > > > > > as >> > > > > > > > > the >> > > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30 >> > minutes >> > > > of >> > > > > > the >> > > > > > > > > bounded >> > > > > > > > > > > >>>>> phase, >> > > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher >> > than >> > > > > what >> > > > > > > the >> > > > > > > > > user >> > > > > > > > > > > >>>>> wants. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Also note that the same issue would also >> > > considerably >> > > > > > limit >> > > > > > > > the >> > > > > > > > > > > >>>>> benefits >> > > > > > > > > > > >>>>>> of >> > > > > > > > > > > >>>>>>> the algorithm. For example, during the >> continuous >> > > > > phase, >> > > > > > > the >> > > > > > > > > > > >>>> algorithm >> > > > > > > > > > > >>>>>> will >> > > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309 >> when >> > > > there >> > > > > > is >> > > > > > > at >> > > > > > > > > least >> > > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval >> in >> > > this >> > > > > > > extreme >> > > > > > > > > case >> > > > > > > > > > > >>>>> (where >> > > > > > > > > > > >>>>>>> there is 30-minutes backlog in the >> > > > continous-unbounded >> > > > > > > phase) >> > > > > > > > > is >> > > > > > > > > > > >>>> still >> > > > > > > > > > > >>>>>>> useful when this happens. But since this is >> the >> > > > > un-common >> > > > > > > > > case, and >> > > > > > > > > > > >>>> the >> > > > > > > > > > > >>>>>>> right solution is probably to do capacity >> > planning >> > > to >> > > > > > avoid >> > > > > > > > > this from >> > > > > > > > > > > >>>>>>> happening in the first place, I am not sure >> it is >> > > > worth >> > > > > > > > > optimizing >> > > > > > > > > > > >>>> for >> > > > > > > > > > > >>>>>> this >> > > > > > > > > > > >>>>>>> case at the cost of regression in the bounded >> > phase >> > > > and >> > > > > > the >> > > > > > > > > reduced >> > > > > > > > > > > >>>>>>> operational predictability for users (e.g. >> what >> > > > > > > checkpointing >> > > > > > > > > > > >>>> interval >> > > > > > > > > > > >>>>>>> should I expect at this stage of the job). >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> I think the fundamental issue with this >> algorithm >> > > is >> > > > > that >> > > > > > > it >> > > > > > > > is >> > > > > > > > > > > >>>> applied >> > > > > > > > > > > >>>>>> to >> > > > > > > > > > > >>>>>>> both the bounded phases and the >> > continous_unbounded >> > > > > > phases >> > > > > > > > > without >> > > > > > > > > > > >>>>>> knowing >> > > > > > > > > > > >>>>>>> which phase the job is running at. The only >> > > > information >> > > > > > it >> > > > > > > > can >> > > > > > > > > access >> > > > > > > > > > > >>>>> is >> > > > > > > > > > > >>>>>>> the backlog. But two sources with the same >> amount >> > > of >> > > > > > > backlog >> > > > > > > > > do not >> > > > > > > > > > > >>>>>>> necessarily mean they have the same data >> > freshness >> > > > > > > > requirement. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> In this particular example, users know that >> the >> > > data >> > > > in >> > > > > > > HDFS >> > > > > > > > > is very >> > > > > > > > > > > >>>>> old >> > > > > > > > > > > >>>>>>> and there is no need for data freshness. Users >> > can >> > > > > > express >> > > > > > > > > signals >> > > > > > > > > > > >>>> via >> > > > > > > > > > > >>>>>> the >> > > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is >> why >> > > the >> > > > > > > current >> > > > > > > > > approach >> > > > > > > > > > > >>>>> in >> > > > > > > > > > > >>>>>>> FLIP-309 can be better in this case. >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> What do you think? >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> Best, >> > > > > > > > > > > >>>>>>> Dong >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> Best, >> > > > > > > > > > > >>>>>>>> Piotrek >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > >>>>>>> >> > > > > > > > > > > >>>>>> >> > > > > > > > > > > >>>>> >> > > > > > > > > > > >>>> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >