Hi Piotr,

Any suggestion on how we can practically move forward to address the target
use-case?

My understanding is that the current proposal does not have any
correctness/performance issues. And it allows the extension to support all
the extra use-case without having to throw away the proposed APIs.

If you prefer to have a better solution with simpler APIs and yet same or
better correctness/performance for the target use-case, could you please
kindly explain its API design so that we can continue the discussion?


Best,
Dong

On Mon, Jul 3, 2023 at 6:39 PM Dong Lin <lindon...@gmail.com> wrote:

> Hi Piotr,
>
> Please see my comments inline.
>
> On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
>> Hi Dong,
>>
>> Starting from the end:
>>
>> > It seems that the only benefit of this approach is to avoid"
>> > adding SplitEnumeratorContext#setIsProcessingBacklog."
>>
>> Yes, that's the major benefit of this counter-proposal.
>>
>> > In the target use-case, user still want to do checkpoint (though at a"
>> > larger interval) when there is backlog. And HybridSource need to know
>> the"
>> > expected checkpoint interval during backlog in order to determine
>> whether"
>> > it should keep throwing CheckpointException. Thus, we still need to add"
>> > execution.checkpointing.interval-during-backlog for user to specify
>> this"
>> > information."
>> >
>> > The downside of this approach is that it is hard to enforce the"
>> > semantics specified by execution.checkpointing.interval-during-backlog.
>> For"
>> > example, suppose execution.checkpointing.interval =3 minute and"
>> > execution.checkpointing.interval-during-backlog = 7 minutes. During the"
>> > backlog phase, checkpoint coordinator will still trigger the checkpoint"
>> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
>> > checkpoint invocation, and the effective checkpoint interval will be 9"
>> > minutes."
>>
>> Does it really matter what's the exact value of the longer interval? Can
>> not we
>> hard-code it to be 5x or 10x of the base checkpoint interval? If there is
>> a
>> notice
>> able overhead from the base interval slowing down records processing rate,
>> reducing this interval by a factor of 5x or 10x, would fix performance
>> issue for
>> vast majority of users. So a source could just skip 4 out of 5 or 9 out of
>> 10
>> checkpoints.
>>
>
> Yes, I think the exact value of the longer interval matters.
>
> The main reason we need two intervals is for jobs which have two-phase
> commit sink. The short interval typically represents the interval that a
> user can accept for the two-phase commit sink to buffer data (since it can
> only emit data when checkpoint is triggered). And the long interval
> typically represents the maximum amount of duplicate work (in terms of
> time) that a job need to re-do after failover.
>
> Since there is no intrinsic relationship between the data buffer interval
> (related to processing latency) and the failover boundary, I don't think we
> can hardcode it to be 5x or 10x of the base checkpoint interval.
>
>
>> Alternatively we could introduce a config option like:
>>
>> execution.checkpointing.long-interval
>>
>> that might be re-used in the future, with more fancy algorithms, but I
>> don't see
>> much value in doing that.
>
>
>> > Overall, I think the solution is a bit hacky. I think it is preferred
>> to"
>> > throw exception only when there is indeed error. If we don't need to
>> check"
>> > a checkpoint, it is preferred to not trigger the checkpoint in the
>> first"
>> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
>> is"
>> > probably not that much of a big deal."
>>
>> Yes it's hacky, but at least it doesn't require extending the Public API
>> for a
>> quite limited solution, that only targets one or two sources that are
>> rarely used.
>>
>
> I am not sure it is fair to say MySQL CDC source is "rarely used".
> ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that
> the proposed feature can be useful for CDC sources with an internal
> "backlog phase". Its usage is not limited to just the two sources mentioned
> in the FLIP.
>
>
>>
>> ================
>>
>> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
>> feeling that
>> this is overly complicated and would require every operator/function to
>> handle that.
>>
>> Yes it would cover even more use cases, at the cost of complicating the
>> system by
>> a lot. IMO it looks like something we could do if there would indeed by a
>> high
>> demand of such a feature, after we provide some baseline generic solution,
>> that
>> doesn't require any configuration.
>>
>> I have a feeling that by just statically looking at the shape of the job
>> graph and how
>> it is connected, we could deduce almost the same things.
>>
>
> Note that pretty much every FLIP will address my use-case at the cost
> of complicating the system. I understand you have the feeling that this
> complexity is not worthwhile. However, as we can see from the comments in
> this thread and the votes in the voting thread, many
> committers/developers/users actually welcome the feature introduced in this
> FLIP.
>
> I am happy to work with you together to find a more generic and simpler
> solution, as long as that solution can address the target use-case without
> hurting user-experience. The alternative solution which you have mentioned
> so far, unfortunately, still has drawbacks as mentioned earlier.
>
>
>> Also:
>>
>> >  - the FLIP suggests to use the long checkpointing interval as long as
>> any subtask is processing the backlog. Are you sure that's the right call?
>> What if other
>> >  sources are producing fresh records, and those fresh records are
>> reaching sinks? It could happen either with disjoint JobGraph,
>> embarrassing
>> parallel
>> >  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
>> slip using a not backpressured input channel through generally
>> backpressured
>> >  keyBy exchange. How should we handle that? This problem I think will
>> affect every solution, including my previously proposed generic one, but
>> we
>> should
>> >  discuss how to handle that as well.
>>
>> By this I didn't necessarily mean that we have to solve it right now.
>>
>> ================
>>
>> > The moments above seem kind of "abstract". I am hoping to understand
>> more
>> > technical details behind these comments so that we can see how to
>> address
>> > the concern.
>>
>> Over the span of this discussion I think I have already explained many
>> times what
>> bothers me in the current proposal.
>>
>
> If I understand you correctly, your concern is that the solution is not
> generic, can not address the extra use-case you want, and is too
> complicated.
>
> I think it is fair to say that no FLIP can be fully generic to address all
> use-case, just like the fact that Flink is still not perfect and still need
> FLIPs to improve its performance/usage. And whether an API is too
> complicated really depends on whether there exists a better option.
>
> Fairly speaking, different people can have different opinions on whether a
> proposal is generic and whether it is too complicated. At least from the
> comments from other developers in this thread and in the voting thread,
> many developers and users actually like this current proposal. I hope you
> understand that your concerns mentioned above are subjective and not
> unnecessarily shared by other developers.
>
> Honestly speaking, if we block this FLIP just because anyone thinks it can
> be better (yet without any concrete proposal for making it better), I feel
> it is not a good result to other developers and users who would like to
> have this feature to address their existing pain points.
>
> I am wondering if you can be a bit more lenient, consider the opinion of
> other developers (not just me) who have voted, and allow us to make
> incremental progress even though you might find it not meeting your
> expectations in its current form?
>
>
>> > For example, even if a FLP does not address all use-case
>> > (which is arguably true for every FLIP), its solution does not
>> necessarily
>> > need to be thrown away later as long as it is extensible
>>
>> That's my main point. I haven't yet seen how proposals from this FLIP,
>> that
>> could
>> extend FLIP-309 to cover the generic use case and:
>>  - Would work out of the box, for all or majority of the properly
>> implemented sources.
>>
>
> I would argue that for the target use-case mentioned in the motivation
> section, it is impossible to address the use-case without any code change
> from the source, and still have the same stability as the current proposal.
> The reason is that when the source do not have event-time, we can not
> correctly derive whether the MySQL CDC source is in the snapshot/binlog
> phase by just looking at the processing time related metrics or
> backpressure.
>
> We have discussed this issue in detail in the earlier emails of this
> thread. I also mentioned that I will add follow-up FLIPs to make use of the
> event-time metrics and backpressure metrics to derive backlog status. But
> that can not replace the capability for source to explicitly specify its
> metrics.
>
>
>
>>  - Would require zero or very minimal configuration/input from the user.
>> Especially
>>
>
> Note that the current proposal only requires the user to specify one extra
> config, namely execution.checkpointing.interval-during-backlog. I don't
> think we are able to reduce the extra config to be zero, due to the reason
> explained above (i.e. separate interval for failover and data freshness).
> Therefore, the configuration from user is already minimal.
>
>
>>    wouldn't require implementing some custom things in every source.
>>  - Could be made to work well enough in the (vast?) majority of the use
>> cases.
>
>
> I think you are talking about the case of delaying checkpoint interval
> when backpressure is high etc. I would argue this is a use-case not
> targeted by this FLIP and it can be addressed in a follow-up FLIP.
>
>
>>
>
>
>> > So we probably need to understand specifically why the proposed APIs
>> would be thrown away.
>>
>> As I have mentioned many times why that's the case:
>> 1. This solution is not generic enough
>> 2. I can see solutions that wouldn't require modification of every source
>> 3. They would have zero overlap with the interfaces extension from this
>> FLIP
>>
>
>
> I have the feeling our discussion is kind of in a loop, where you ask for
> a solution without any change to the source (so that it is generic), I
> explain why I am not able to find such a solution and the drawback of your
> proposed solution, and then you repeat the same ask and insist this is
> possible.
>
> If you can find a solution that wouldn't require modification of every
> source and still address the target use-case well, could you please kindly
> rephrase your solution so that we can revisit it?
>
> I assume this solution would not require extra config from users, would
> not cause the job to use long checkpoint interval due to random/short
> traffic spikes, and would not cause the job to use the short interval when
> the job is still reading backlog data.
>
> I would be happy to be proven wrong if you else can provide such a
> solution without the aforementioned drawbacks. I just hope we don't block
> the FLIP forever for a goal that no one can address.
>
> Best,
> Dong
>
>
>>
>> Best,
>> Piotrek
>>
>> sob., 1 lip 2023 o 17:01 Dong Lin <lindon...@gmail.com> napisał(a):
>>
>> > Hi Piotr,
>> >
>> > Thank you for providing further suggestions to help improve the API.
>> Please
>> > see my comments inline.
>> >
>> > On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org>
>> > wrote:
>> >
>> > > Hey,
>> > >
>> > > Sorry for a late reply, I was OoO for a week. I have three things to
>> > point
>> > > out.
>> > >
>> > > 1. ===============
>> > >
>> > > The updated proposal is indeed better, but to be honest I still don't
>> > like
>> > > it, for mostly the same reasons that I have mentioned earlier:
>> > > - only a partial solution, that doesn't address all use cases, so we
>> > would
>> > > need to throw it away sooner or later
>> > > - I don't see and it hasn't been discussed how to make it work out of
>> the
>> > > box for all sources
>> > > - somehow complicating API for people implementing Sources
>> > > - it should work out of the box for most of the sources, or at least
>> to
>> > > have that potential in the future
>> > >
>> >
>> > The moments above seem kind of "abstract". I am hoping to understand
>> more
>> > technical details behind these comments so that we can see how to
>> address
>> > the concern. For example, even if a FLP does not address all use-case
>> > (which is arguably true for every FLIP), its solution does not
>> necessarily
>> > need to be thrown away later as long as it is extensible. So we probably
>> > need to understand specifically why the proposed APIs would be thrown
>> away.
>> >
>> > Similarly, we would need to understand if there is a better design to
>> make
>> > the API simpler and work out of the box etc. in order to decide how to
>> > address these comments.
>> >
>> >
>> > > On top of that:
>> > > - the FLIP I think is missing how to hook up SplitEnumeratorContext
>> and
>> > > CheckpointCoordinator to pass "isProcessingBacklog"
>> > >
>> >
>> > I think it can be passed via the following function chain:
>> > - CheckpointCoordinator invokes
>> > OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
>> > coordinatorsToCheckpoint) to get this information.
>> > - OperatorCoordinatorHolder implements
>> > OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
>> > OperatorCoordinator#isProcessingBacklog (via coordinator)
>> > - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog
>> and
>> > returns SourceCoordinatorContext#isProcessingBacklog
>> > - SourceCoordinatorContext will implement
>> > SplitEnumeratorContext#setIsProcessingBacklog and stores the given
>> > information in a variable.
>> >
>> > Note that it involves only internal API. We might be able to find a
>> simpler
>> > solution with less functions on the path. As long as the above solution
>> > works without having any performance or correctness, I think maybe we
>> > should focus on the public API design and discuss the implementation in
>> the
>> > PR review?
>> >
>> > - the FLIP suggests to use the long checkpointing interval as long as
>> any
>> > > subtask is processing the backlog. Are you sure that's the right call?
>> > What
>> > > if other
>> > >   sources are producing fresh records, and those fresh records are
>> > reaching
>> > > sinks? It could happen either with disjoint JobGraph, embarrassing
>> > parallel
>> > >   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records
>> can
>> > > slip using a not backpressured input channel through generally
>> > > backpressured
>> > >   keyBy exchange. How should we handle that? This problem I think will
>> > > affect every solution, including my previously proposed generic one,
>> but
>> > we
>> > > should
>> > >   discuss how to handle that as well.
>> > >
>> >
>> > Good question. Here is my plan to improve the solution in a follow-up
>> FLIP:
>> >
>> > - Let every subtask of every source operator emit
>> > RecordAttributes(isBacklog=..)
>> > - Let every subtask of every operator handle the RecordAttributes
>> received
>> > from inputs and emit RecordAttributes to downstream operators. Flink
>> > runtime can derive this information for every one-input operator. For an
>> > operator with two inputs, if one input has isBacklog=true and the other
>> has
>> > isBacklog=false, the operator should determine the isBacklog for its
>> output
>> > records based on its semantics.
>> > - If there exists a subtask of a two-phase commit operator with
>> > isBacklog=false, the operator should let JM know so that the JM will use
>> > the short checkpoint interval (for data freshness). Otherwise, JM will
>> use
>> > the long checkpoint interval.
>> >
>> > The above solution guarantees that, if every two-input operator has
>> > explicitly specified their isBacklog based on the inputs' isBacklog,
>> then
>> > the JM will use the short checkpoint interval if and only if it is
>> useful
>> > for at least one subtask of one two-phase commit operator.
>> >
>> > Note that even the above solution might not be perfect. Suppose there
>> > exists one subtask of the two-phase commit operator has isBacklog=false,
>> > but every other subtasks of this operator has isBacklog=true, due to
>> load
>> > imbalance. In this case, it might be beneficial to use the long
>> checkpoint
>> > interval to improve the average data freshness for this operator.
>> However,
>> > as we get into more edge case, the solution will become more complicated
>> > (e.g. providing more APIs for user to specify their intended strategy)
>> and
>> > there will be less additional benefits (because these scenarios are less
>> > common).
>> >
>> > Also, note that we can support the solution described above without
>> > throwing away any public API currently proposed in FLIP-309. More
>> > specifically, we still
>> > need execution.checkpointing.interval-during-backlog. Sources such as
>> > HybridSource and MySQL CDC source can still use
>> setIsProcessingBacklog() to
>> > specify the backlog status. We just need to update
>> setIsProcessingBacklog()
>> > to emit RecordAttributes(isBacklog=..) upon invocation.
>> >
>> > I hope the above solution is reasonable and can address most of your
>> > concerns. And I hope we can use FLIP-309 to solve a large chunk of the
>> > existing problems in Flink 1.18 release and make further improvements in
>> > followup FLIPs. What do you think?
>> >
>> >
>> >
>> > > 2. ===============
>> > >
>> > > Regarding the current proposal, there might be a way to make it
>> actually
>> > > somehow generic (but not pluggable). But it might require slightly
>> > > different
>> > > interfaces. We could keep the idea that
>> SourceCoordinator/SplitEnumerator
>> > > is responsible for switching between slow/fast processing modes. It
>> could
>> > > be
>> > > implemented to achieve something like in the FLIP-309 proposal, but
>> apart
>> > > of that, the default behaviour would be a built in mechanism working
>> like
>> > > this:
>> > > 1. Every SourceReaderBase checks its metrics and its state, to decide
>> if
>> > it
>> > > considers itself as "processingBacklog" or "veryBackpressured". The
>> base
>> > >     implementation could do it via a similar mechanism as I was
>> proposing
>> > > previously, via looking at the busy/backPressuredTimeMsPerSecond,
>> > >     pendingRecords and processing rate.
>> > > 2. SourceReaderBase could send an event with
>> > > "processingBacklog"/"veryBackpressured" state.
>> > > 3. SourceCoordinator would collect those events, and decide what
>> should
>> > it
>> > > do, whether it should switch whole source to the
>> > >     "processingBacklog"/"veryBackpressured" state or not.
>> > >
>> > That could provide eventually a generic solution that works fo every
>> > > source that reports the required metrics. Each source implementation
>> > could
>> > > decide
>> > > whether to use that default behaviour, or if maybe it's better to
>> > override
>> > > the default, or combine default with something custom (like
>> > HybridSource).
>> > >
>> > > And as a first step, we could implement that mechanism only on the
>> > > SourceCoordinator side, without events, without the default generic
>> > > solution and use
>> > > it in the HybridSource/MySQL CDC.
>> > >
>> > > This approach has some advantages compared to my previous proposal:
>> > >   + no need to tinker with metrics and pushing metrics from TMs to JM
>> > >   + somehow communicating this information via Events seems a bit
>> cleaner
>> > > to me and avoids problems with freshness of the metrics
>> > > And some issues:
>> > >   - I don't know if it can be made pluggable in the future. If a user
>> > could
>> > > implement a custom `CheckpointTrigger` that would automatically work
>> with
>> > > all/most
>> > >     of the pre-existing sources?
>> > >   - I don't know if it can be expanded if needed in the future, to
>> make
>> > > decisions based on operators in the middle of a jobgraph.
>> > >
>> >
>> > Thanks for the proposal. Overall, I agree it is valuable to be able to
>> > determine the isProcessingBacklog based on the source reader metrics.
>> >
>> > I will probably suggest making the following changes upon your idea:
>> > - Instead of letting the source reader send events to the source
>> > coordinator, the source reader can emit RecordAttributes(isBacklog=..)
>> as
>> > described earlier. We will let two-phase commit operator to decide
>> whether
>> > they need the short checkpoint interval.
>> > - We consider isProcessingBacklog=true when watermarkLag is larger than
>> a
>> > threshold.
>> >
>> > This is a nice addition. But I think we still need extra information
>> from
>> > user (e.g. the threshold whether the watermarkLag or
>> > backPressuredTimeMsPerSecond is too high) with extra public APIs for
>> this
>> > feature to work reliably. This is because there is no default algorithm
>> > that works in all cases without extra specification from users, due to
>> the
>> > issues around the default algorithm we discussed previously.
>> >
>> > Overall, I think the current proposal in FLIP-309 is a first step
>> towards
>> > addressing these problems. The API for source enumerator to explicitly
>> set
>> > isProcessingBacklog based on its status is useful even if we can support
>> > metrics-based solutions.
>> >
>> > If that looks reasonable, can we agree to make incremental improvement
>> and
>> > work on the metrics-based solution in a followup FLIP?
>> >
>> >
>> > >
>> > > 3. ===============
>> > >
>> > > Independent of that, during some brainstorming between me, Chesnay and
>> > > Stefan Richter, an idea popped up, that I think could be a counter
>> > proposal
>> > > as
>> > > an intermediate solution that probably effectively works the same way
>> as
>> > > current FLIP-309.
>> > >
>> > > Inside a HybridSource, from it's SplitEnumerator#snapshotState method,
>> > can
>> > > not you throw an exception like
>> > > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new
>> > > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`?
>> > > Actually we could also introduce a dedicated `CheckpointFailureReason`
>> > for
>> > > that purpose and handle it some special way in some places (like maybe
>> > hide
>> > > such rejected checkpoints from the REST API/WebUI). We could
>> elaborate on
>> > > this a bit more, but after a brief thinking  I could see it actually
>> > > working well
>> > > enough without any public facing changes. But I might be wrong here.
>> > >
>> > > If this feature actually grabs traction, we could expand it to
>> something
>> > > more sophisticated available via a public API in the future.
>> > >
>> >
>> > In the target use-case, user still want to do checkpoint (though at a
>> > larger interval) when there is backlog. And HybridSource need to know
>> the
>> > expected checkpoint interval during backlog in order to determine
>> whether
>> > it should keep throwing CheckpointException. Thus, we still need to add
>> > execution.checkpointing.interval-during-backlog for user to specify this
>> > information.
>> >
>> > It seems that the only benefit of this approach is to avoid
>> > adding SplitEnumeratorContext#setIsProcessingBacklog.
>> >
>> > The downside of this approach is that it is hard to enforce the
>> > semantics specified by execution.checkpointing.interval-during-backlog.
>> For
>> > example, suppose execution.checkpointing.interval =3 minute and
>> > execution.checkpointing.interval-during-backlog = 7 minutes. During the
>> > backlog phase, checkpoint coordinator will still trigger the checkpoint
>> > once every 3 minutes. HybridSource will need to reject 2 out of the 3
>> > checkpoint invocation, and the effective checkpoint interval will be 9
>> > minutes.
>> >
>> > Overall, I think the solution is a bit hacky. I think it is preferred to
>> > throw exception only when there is indeed error. If we don't need to
>> check
>> > a checkpoint, it is preferred to not trigger the checkpoint in the first
>> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
>> is
>> > probably not that much of a big deal.
>> >
>> > Thanks for all the comments. I am looking forward to your thoughts.
>> >
>> > Best,
>> > Dong
>> >
>> >
>> > >
>> > > ===============
>> > >
>> > > Sorry for disturbing this FLIP discussion and voting.
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com>
>> > napisał(a):
>> > >
>> > > > Hi Dong,
>> > > >
>> > > > Thanks for your quick reply. I think this has truly solved our
>> problem
>> > > and
>> > > > will enable us upgrade our existing jobs more seamless.
>> > > >
>> > > > Best,
>> > > > Xiangyu
>> > > >
>> > > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道:
>> > > >
>> > > > > Hi Feng,
>> > > > >
>> > > > > Thanks for the feedback. Yes, you can configure the
>> > > > > execution.checkpointing.interval-during-backlog to effectively
>> > disable
>> > > > > checkpoint during backlog.
>> > > > >
>> > > > > Prior to your comment, the FLIP allows users to do this by setting
>> > the
>> > > > > config value to something large (e.g. 365 day). After thinking
>> about
>> > > this
>> > > > > more, we think it is more usable to allow users to achieve this
>> goal
>> > by
>> > > > > setting the config value to 0. This is consistent with the
>> existing
>> > > > > behavior of execution.checkpointing.interval -- the checkpoint is
>> > > > disabled
>> > > > > if user set execution.checkpointing.interval to 0.
>> > > > >
>> > > > > We have updated the description of
>> > > > > execution.checkpointing.interval-during-backlog
>> > > > > to say the following:
>> > > > > ... it is not null, the value must either be 0, which means the
>> > > > checkpoint
>> > > > > is disabled during backlog, or be larger than or equal to
>> > > > > execution.checkpointing.interval.
>> > > > >
>> > > > > Does this address your need?
>> > > > >
>> > > > > Best,
>> > > > > Dong
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <
>> xiangyu...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Dong and Yunfeng,
>> > > > > >
>> > > > > > Thanks for the proposal, your flip sounds very useful from my
>> > > > > perspective.
>> > > > > > In our business, when we using hybrid source in production we
>> also
>> > > met
>> > > > > the
>> > > > > > problem described in your flip.
>> > > > > > In our solution, we tend to skip making any checkpoints before
>> all
>> > > > batch
>> > > > > > tasks have finished and resume the periodic checkpoint only in
>> > > > streaming
>> > > > > > phrase. Within this flip, we can solve our problem in a more
>> > generic
>> > > > way.
>> > > > > >
>> > > > > > However, I am wondering if we still want to skip making any
>> > > checkpoints
>> > > > > > during historical phrase, can we set this configuration
>> > > > > > "execution.checkpointing.interval-during-backlog" equals "-1" to
>> > > cover
>> > > > > this
>> > > > > > case?
>> > > > > >
>> > > > > > Best,
>> > > > > > Xiangyu
>> > > > > >
>> > > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道:
>> > > > > >
>> > > > > > > Thanks for Dong and Yunfeng's work.
>> > > > > > >
>> > > > > > > The FLIP looks good to me. This new version is clearer to
>> > > understand.
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Hang
>> > > > > > >
>> > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道:
>> > > > > > >
>> > > > > > > > Thanks Jack, Jingsong, and Zhu for the review!
>> > > > > > > >
>> > > > > > > > Thanks Zhu for the suggestion. I have updated the
>> configuration
>> > > > name
>> > > > > as
>> > > > > > > > suggested.
>> > > > > > > >
>> > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com>
>> > > wrote:
>> > > > > > > >
>> > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving
>> > this
>> > > > > > > > discussion.
>> > > > > > > > >
>> > > > > > > > > The new design looks generally good to me. Increasing the
>> > > > > checkpoint
>> > > > > > > > > interval when the job is processing backlogs is easier for
>> > > users
>> > > > to
>> > > > > > > > > understand and can help in more scenarios.
>> > > > > > > > >
>> > > > > > > > > I have one comment about the new configuration.
>> > > > > > > > > Naming the new configuration
>> > > > > > > > > "execution.checkpointing.interval-during-backlog" would be
>> > > better
>> > > > > > > > > according to Flink config naming convention.
>> > > > > > > > > It is also because that nested config keys should be
>> avoided.
>> > > See
>> > > > > > > > > FLINK-29372 for more details.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Zhu
>> > > > > > > > >
>> > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二
>> 15:45写道:
>> > > > > > > > > >
>> > > > > > > > > > Looks good to me!
>> > > > > > > > > >
>> > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and
>> > design.
>> > > > > > > > > >
>> > > > > > > > > > Best,
>> > > > > > > > > > Jingsong
>> > > > > > > > > >
>> > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <
>> imj...@gmail.com>
>> > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > Thank you Dong for driving this FLIP.
>> > > > > > > > > > >
>> > > > > > > > > > > The new design looks good to me!
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Jark
>> > > > > > > > > > >
>> > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thank you Leonard for the review!
>> > > > > > > > > > > >
>> > > > > > > > > > > > Hi Piotr, do you have any comments on the latest
>> > > proposal?
>> > > > > > > > > > > >
>> > > > > > > > > > > > I am wondering if it is OK to start the voting
>> thread
>> > > this
>> > > > > > week.
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <
>> > > > > xbjt...@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > >> Thanks Dong for driving this FLIP forward!
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Introducing  `backlog status` concept for flink job
>> > > makes
>> > > > > > sense
>> > > > > > > to
>> > > > > > > > > me as
>> > > > > > > > > > > >> following reasons:
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> From concept/API design perspective, it’s more
>> general
>> > > and
>> > > > > > > natural
>> > > > > > > > > than
>> > > > > > > > > > > >> above proposals as it can be used in HybridSource
>> for
>> > > > > bounded
>> > > > > > > > > records, CDC
>> > > > > > > > > > > >> Source for history snapshot and general sources
>> like
>> > > > > > KafkaSource
>> > > > > > > > for
>> > > > > > > > > > > >> historical messages.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> From user cases/requirements, I’ve seen many users
>> > > > manually
>> > > > > to
>> > > > > > > set
>> > > > > > > > > larger
>> > > > > > > > > > > >> checkpoint interval during backfilling and then
>> set a
>> > > > > shorter
>> > > > > > > > > checkpoint
>> > > > > > > > > > > >> interval for real-time processing in their
>> production
>> > > > > > > environments
>> > > > > > > > > as a
>> > > > > > > > > > > >> flink application optimization. Now, the flink
>> > framework
>> > > > can
>> > > > > > > make
>> > > > > > > > > this
>> > > > > > > > > > > >> optimization no longer require the user to set the
>> > > > > checkpoint
>> > > > > > > > > interval and
>> > > > > > > > > > > >> restart the job multiple times.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Following supporting using larger checkpoint for
>> job
>> > > under
>> > > > > > > backlog
>> > > > > > > > > status
>> > > > > > > > > > > >> in current FLIP, we can explore supporting larger
>> > > > > > > > > parallelism/memory/cpu
>> > > > > > > > > > > >> for job under backlog status in the future.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> In short, the updated FLIP looks good to me.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Best,
>> > > > > > > > > > > >> Leonard
>> > > > > > > > > > > >>
>> > > > > > > > > > > >>
>> > > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <
>> > > > > lindon...@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Hi Piotr,
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Thanks again for proposing the isProcessingBacklog
>> > > > concept.
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> After discussing with Becket Qin and thinking
>> about
>> > > this
>> > > > > > more,
>> > > > > > > I
>> > > > > > > > > agree it
>> > > > > > > > > > > >>> is a better idea to add a top-level concept to all
>> > > source
>> > > > > > > > > operators to
>> > > > > > > > > > > >>> address the target use-case.
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> The main reason that changed my mind is that
>> > > > > > > isProcessingBacklog
>> > > > > > > > > can be
>> > > > > > > > > > > >>> described as an inherent/nature attribute of every
>> > > source
>> > > > > > > > instance
>> > > > > > > > > and
>> > > > > > > > > > > >> its
>> > > > > > > > > > > >>> semantics does not need to depend on any specific
>> > > > > > checkpointing
>> > > > > > > > > policy.
>> > > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog
>> > behavior
>> > > > for
>> > > > > > the
>> > > > > > > > > sources we
>> > > > > > > > > > > >>> have considered so far (e.g. HybridSource and
>> MySQL
>> > CDC
>> > > > > > source)
>> > > > > > > > > without
>> > > > > > > > > > > >>> asking users to explicitly configure the
>> per-source
>> > > > > behavior,
>> > > > > > > > which
>> > > > > > > > > > > >> indeed
>> > > > > > > > > > > >>> provides better user experience.
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> I have updated the FLIP based on the latest
>> > > suggestions.
>> > > > > The
>> > > > > > > > > latest FLIP
>> > > > > > > > > > > >> no
>> > > > > > > > > > > >>> longer introduces per-source config that can be
>> used
>> > by
>> > > > > > > > end-users.
>> > > > > > > > > While
>> > > > > > > > > > > >> I
>> > > > > > > > > > > >>> agree with you that CheckpointTrigger can be a
>> useful
>> > > > > feature
>> > > > > > > to
>> > > > > > > > > address
>> > > > > > > > > > > >>> additional use-cases, I am not sure it is
>> necessary
>> > for
>> > > > the
>> > > > > > > > > use-case
>> > > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce
>> > > > > > CheckpointTrigger
>> > > > > > > > > separately
>> > > > > > > > > > > >>> in another FLIP?
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Can you help take another look at the updated
>> FLIP?
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Best,
>> > > > > > > > > > > >>> Dong
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
>> > > > > > > > > pnowoj...@apache.org>
>> > > > > > > > > > > >>> wrote:
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>>> Hi Dong,
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask
>> has
>> > > 1%
>> > > > > > chance
>> > > > > > > > of
>> > > > > > > > > being
>> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
>> > > traffic
>> > > > > > > spikes).
>> > > > > > > > > Then at
>> > > > > > > > > > > >>>> any
>> > > > > > > > > > > >>>>> given time, the chance of the job
>> > > > > > > > > > > >>>>> being considered not-backpressured =
>> (1-0.01)^1000.
>> > > > Since
>> > > > > > we
>> > > > > > > > > evaluate
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated
>> > time
>> > > > for
>> > > > > > the
>> > > > > > > > job
>> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1
>> /
>> > > > > > > > > ((1-0.01)^1000) =
>> > > > > > > > > > > >> 23163
>> > > > > > > > > > > >>>>> sec = 6.4 hours.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> This means that the job will effectively always
>> use
>> > > the
>> > > > > > > longer
>> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
>> > concern,
>> > > > > > right?
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Sorry I don't understand where you are getting
>> those
>> > > > > numbers
>> > > > > > > > from.
>> > > > > > > > > > > >>>> Instead of trying to find loophole after
>> loophole,
>> > > could
>> > > > > you
>> > > > > > > try
>> > > > > > > > > to
>> > > > > > > > > > > >> think
>> > > > > > > > > > > >>>> how a given loophole could be improved/solved?
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to
>> know
>> > the
>> > > > > APIs
>> > > > > > > due
>> > > > > > > > > to the
>> > > > > > > > > > > >>>>> following reasons.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Please propose something. I don't think it's
>> needed.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
>> motivation
>> > > > > > section,
>> > > > > > > > > would the
>> > > > > > > > > > > >>>> APIs
>> > > > > > > > > > > >>>>> of this alternative approach be more or less
>> > usable?
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Everything that you originally wanted to achieve
>> in
>> > > > > > FLIP-309,
>> > > > > > > > you
>> > > > > > > > > could
>> > > > > > > > > > > >> do
>> > > > > > > > > > > >>>> as well in my proposal.
>> > > > > > > > > > > >>>> Vide my many mentions of the "hacky solution".
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
>> > use-case
>> > > > > (e.g.
>> > > > > > > > allow
>> > > > > > > > > > > >>>>> checkpointing interval to change dynamically
>> even
>> > > > during
>> > > > > > the
>> > > > > > > > > unbounded
>> > > > > > > > > > > >>>>> phase) as it claims?
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> I don't see why not.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
>> > currently
>> > > > > > > proposed
>> > > > > > > > in
>> > > > > > > > > > > >>>> FLIP-309?
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Yes
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
>> > approach
>> > > > can
>> > > > > > be
>> > > > > > > > > decoupled
>> > > > > > > > > > > >>>> from
>> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
>> > > might
>> > > > be
>> > > > > > > > > reasonable to
>> > > > > > > > > > > >>>>> work on this extra use-case with a more
>> > > > > > advanced/complicated
>> > > > > > > > > design
>> > > > > > > > > > > >>>>> separately in a followup work.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> As I voiced my concerns previously, the current
>> > design
>> > > > of
>> > > > > > > > > FLIP-309 would
>> > > > > > > > > > > >>>> clog the public API and in the long run confuse
>> the
>> > > > users.
>> > > > > > IMO
>> > > > > > > > > It's
>> > > > > > > > > > > >>>> addressing the
>> > > > > > > > > > > >>>> problem in the wrong place.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
>> > > > > > > > > > > >>>>> - Have all source operators emit a metric named
>> > > > > > > > > "processingBacklog".
>> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
>> > > > > checkpointing
>> > > > > > > > > interval to
>> > > > > > > > > > > >> be
>> > > > > > > > > > > >>>>> used when any source is processing backlog".
>> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
>> > > periodically
>> > > > > from
>> > > > > > > all
>> > > > > > > > > source
>> > > > > > > > > > > >>>>> operators and uses the newly added config value
>> as
>> > > > > > > appropriate.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Yes.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> The challenge with this approach is that we
>> need to
>> > > > > define
>> > > > > > > the
>> > > > > > > > > > > >> semantics
>> > > > > > > > > > > >>>> of
>> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
>> source
>> > > > > > operators
>> > > > > > > > > > > >>>>> implement this metric. I am not sure we are
>> able to
>> > > do
>> > > > > this
>> > > > > > > yet
>> > > > > > > > > without
>> > > > > > > > > > > >>>>> having users explicitly provide this information
>> > on a
>> > > > > > > > per-source
>> > > > > > > > > basis.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka
>> source,
>> > > > should
>> > > > > it
>> > > > > > > > emit
>> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
>> > might
>> > > > use
>> > > > > > long
>> > > > > > > > > > > >>>> checkpointing
>> > > > > > > > > > > >>>>> interval even
>> > > > > > > > > > > >>>>> if the job is asked to process data starting
>> from
>> > now
>> > > > to
>> > > > > > the
>> > > > > > > > > next 1
>> > > > > > > > > > > >> hour.
>> > > > > > > > > > > >>>>> If no, then the job might use the short
>> > checkpointing
>> > > > > > > interval
>> > > > > > > > > > > >>>>> even if the job is asked to re-process data
>> > starting
>> > > > > from 7
>> > > > > > > > days
>> > > > > > > > > ago.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Yes. The same can be said of your proposal. Your
>> > > > proposal
>> > > > > > has
>> > > > > > > > the
>> > > > > > > > > very
>> > > > > > > > > > > >> same
>> > > > > > > > > > > >>>> issues
>> > > > > > > > > > > >>>> that every source would have to implement it
>> > > > differently,
>> > > > > > most
>> > > > > > > > > sources
>> > > > > > > > > > > >>>> would
>> > > > > > > > > > > >>>> have no idea how to properly calculate the new
>> > > requested
>> > > > > > > > > checkpoint
>> > > > > > > > > > > >>>> interval,
>> > > > > > > > > > > >>>> for those that do know how to do that, user would
>> > have
>> > > > to
>> > > > > > > > > configure
>> > > > > > > > > > > >> every
>> > > > > > > > > > > >>>> source
>> > > > > > > > > > > >>>> individually and yet again we would end up with a
>> > > > system,
>> > > > > > that
>> > > > > > > > > works
>> > > > > > > > > > > >> only
>> > > > > > > > > > > >>>> partially in
>> > > > > > > > > > > >>>> some special use cases (HybridSource), that's
>> > > confusing
>> > > > > the
>> > > > > > > > users
>> > > > > > > > > even
>> > > > > > > > > > > >>>> more.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> That's why I think the more generic solution,
>> > working
>> > > > > > > primarily
>> > > > > > > > > on the
>> > > > > > > > > > > >> same
>> > > > > > > > > > > >>>> metrics that are used by various auto scaling
>> > > solutions
>> > > > > > (like
>> > > > > > > > > Flink K8s
>> > > > > > > > > > > >>>> operator's
>> > > > > > > > > > > >>>> autosaler) would be better. The hacky solution I
>> > > > proposed
>> > > > > > to:
>> > > > > > > > > > > >>>> 1. show you that the generic solution is simply a
>> > > > superset
>> > > > > > of
>> > > > > > > > your
>> > > > > > > > > > > >> proposal
>> > > > > > > > > > > >>>> 2. if you are adamant that
>> > > > busyness/backpressured/records
>> > > > > > > > > processing
>> > > > > > > > > > > >>>> rate/pending records
>> > > > > > > > > > > >>>>   metrics wouldn't cover your use case
>> sufficiently
>> > > (imo
>> > > > > > they
>> > > > > > > > > can),
>> > > > > > > > > > > >> then
>> > > > > > > > > > > >>>> you can very easily
>> > > > > > > > > > > >>>>   enhance this algorithm with using some hints
>> from
>> > > the
>> > > > > > > sources.
>> > > > > > > > > Like
>> > > > > > > > > > > >>>> "processingBacklog==true"
>> > > > > > > > > > > >>>>   to short circuit the main algorithm, if
>> > > > > > `processingBacklog`
>> > > > > > > is
>> > > > > > > > > > > >>>> available.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Best,
>> > > > > > > > > > > >>>> Piotrek
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <
>> > > lindon...@gmail.com>
>> > > > > > > > > napisał(a):
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>> Hi again Piotr,
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Thank you for the reply. Please see my reply
>> > inline.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski
>> <
>> > > > > > > > > > > >>>> piotr.nowoj...@gmail.com>
>> > > > > > > > > > > >>>>> wrote:
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>> Hi again Dong,
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> I understand that JM will get the
>> > > > backpressure-related
>> > > > > > > > metrics
>> > > > > > > > > every
>> > > > > > > > > > > >>>>> time
>> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
>> request
>> > to
>> > > > get
>> > > > > > > these
>> > > > > > > > > > > >>>> metrics.
>> > > > > > > > > > > >>>>>> But
>> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
>> > > always
>> > > > > > > > > receiving the
>> > > > > > > > > > > >>>>> REST
>> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is
>> no
>> > > > human
>> > > > > > > > manually
>> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
>> > does,
>> > > > > what
>> > > > > > is
>> > > > > > > > the
>> > > > > > > > > > > >>>>> interval?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Good catch, I've thought that metrics are
>> > > > pre-emptively
>> > > > > > sent
>> > > > > > > > to
>> > > > > > > > > JM
>> > > > > > > > > > > >>>> every
>> > > > > > > > > > > >>>>> 10
>> > > > > > > > > > > >>>>>> seconds.
>> > > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and
>> that
>> > > > would
>> > > > > > > have
>> > > > > > > > > to be
>> > > > > > > > > > > >>>>>> improved.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already
>> paying
>> > > this
>> > > > > much
>> > > > > > > > > overhead
>> > > > > > > > > > > >>>> just
>> > > > > > > > > > > >>>>>> for
>> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
>> > still
>> > > > > doubt
>> > > > > > > it
>> > > > > > > > > is true.
>> > > > > > > > > > > >>>>> Can
>> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
>> > configured?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
>> add
>> > > > extra
>> > > > > > code
>> > > > > > > > to
>> > > > > > > > > invoke
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means
>> we
>> > > need
>> > > > to
>> > > > > > > > > considerably
>> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
>> > the
>> > > > > > overhead
>> > > > > > > > > will
>> > > > > > > > > > > >>>>> increase
>> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
>> > pose
>> > > > risk
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > > >>>>>> scalability
>> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we
>> should
>> > do
>> > > > > this.
>> > > > > > > What
>> > > > > > > > > do you
>> > > > > > > > > > > >>>>>> think?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported
>> > every
>> > > > > > 100ms.
>> > > > > > > I
>> > > > > > > > > meant
>> > > > > > > > > > > >>>> that
>> > > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would
>> > report
>> > > (a
>> > > > > > value
>> > > > > > > > of)
>> > > > > > > > > > > >>>>> 100ms/s."
>> > > > > > > > > > > >>>>>> once per metric interval (10s?).
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask
>> has
>> > > 1%
>> > > > > > chance
>> > > > > > > > of
>> > > > > > > > > being
>> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
>> > > traffic
>> > > > > > > spikes).
>> > > > > > > > > Then at
>> > > > > > > > > > > >>>> any
>> > > > > > > > > > > >>>>> given time, the chance of the job
>> > > > > > > > > > > >>>>> being considered not-backpressured =
>> (1-0.01)^1000.
>> > > > Since
>> > > > > > we
>> > > > > > > > > evaluate
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated
>> > time
>> > > > for
>> > > > > > the
>> > > > > > > > job
>> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1
>> /
>> > > > > > > > > ((1-0.01)^1000) =
>> > > > > > > > > > > >> 23163
>> > > > > > > > > > > >>>>> sec = 6.4 hours.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> This means that the job will effectively always
>> use
>> > > the
>> > > > > > > longer
>> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
>> > concern,
>> > > > > > right?
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>> - What is the interface of this
>> > CheckpointTrigger?
>> > > > For
>> > > > > > > > > example, are
>> > > > > > > > > > > >>>> we
>> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context
>> that it
>> > > can
>> > > > > use
>> > > > > > > to
>> > > > > > > > > fetch
>> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
>> > > understand
>> > > > > what
>> > > > > > > > > information
>> > > > > > > > > > > >>>>>> this
>> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
>> > the
>> > > > > > > checkpoint
>> > > > > > > > > > > >>>> decision.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> I honestly don't think this is important at
>> this
>> > > stage
>> > > > > of
>> > > > > > > the
>> > > > > > > > > > > >>>> discussion.
>> > > > > > > > > > > >>>>>> It could have
>> > > > > > > > > > > >>>>>> whatever interface we would deem to be best.
>> > > Required
>> > > > > > > things:
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> - access to at least a subset of metrics that
>> the
>> > > > given
>> > > > > > > > > > > >>>>> `CheckpointTrigger`
>> > > > > > > > > > > >>>>>> requests,
>> > > > > > > > > > > >>>>>> for example via some registration mechanism,
>> so we
>> > > > don't
>> > > > > > > have
>> > > > > > > > to
>> > > > > > > > > > > >>>> fetch
>> > > > > > > > > > > >>>>>> all of the
>> > > > > > > > > > > >>>>>> metrics all the time from TMs.
>> > > > > > > > > > > >>>>>> - some way to influence
>> `CheckpointCoordinator`.
>> > > > Either
>> > > > > > via
>> > > > > > > > > manually
>> > > > > > > > > > > >>>>>> triggering
>> > > > > > > > > > > >>>>>> checkpoints, and/or ability to change the
>> > > > checkpointing
>> > > > > > > > > interval.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to
>> know
>> > the
>> > > > > APIs
>> > > > > > > due
>> > > > > > > > > to the
>> > > > > > > > > > > >>>>> following reasons.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> We would need to know the concrete APIs to gauge
>> > the
>> > > > > > > following:
>> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
>> motivation
>> > > > > > section,
>> > > > > > > > > would the
>> > > > > > > > > > > >>>> APIs
>> > > > > > > > > > > >>>>> of this alternative approach be more or less
>> > usable?
>> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
>> > use-case
>> > > > > (e.g.
>> > > > > > > > allow
>> > > > > > > > > > > >>>>> checkpointing interval to change dynamically
>> even
>> > > > during
>> > > > > > the
>> > > > > > > > > unbounded
>> > > > > > > > > > > >>>>> phase) as it claims?
>> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
>> > currently
>> > > > > > > proposed
>> > > > > > > > in
>> > > > > > > > > > > >>>> FLIP-309?
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
>> > approach
>> > > > can
>> > > > > > be
>> > > > > > > > > decoupled
>> > > > > > > > > > > >>>> from
>> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
>> > > might
>> > > > be
>> > > > > > > > > reasonable to
>> > > > > > > > > > > >>>>> work on this extra use-case with a more
>> > > > > > advanced/complicated
>> > > > > > > > > design
>> > > > > > > > > > > >>>>> separately in a followup work.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
>> > > > example,
>> > > > > > is
>> > > > > > > it
>> > > > > > > > > going
>> > > > > > > > > > > >>>> to
>> > > > > > > > > > > >>>>>> run
>> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or
>> is it
>> > > > going
>> > > > > > to
>> > > > > > > > run
>> > > > > > > > > on the
>> > > > > > > > > > > >>>>> JM?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> IMO on the JM.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> - Are we going to provide a default
>> > implementation
>> > > of
>> > > > > > this
>> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
>> > > > > algorithm
>> > > > > > > > > described
>> > > > > > > > > > > >>>>> below,
>> > > > > > > > > > > >>>>>>> or do we expect each source operator
>> developer to
>> > > > > > implement
>> > > > > > > > > their own
>> > > > > > > > > > > >>>>>>> CheckpointTrigger?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> As I mentioned before, I think we should
>> provide
>> > at
>> > > > the
>> > > > > > very
>> > > > > > > > > least the
>> > > > > > > > > > > >>>>>> implementation
>> > > > > > > > > > > >>>>>> that replaces the current triggering mechanism
>> > > > > (statically
>> > > > > > > > > configured
>> > > > > > > > > > > >>>>>> checkpointing interval)
>> > > > > > > > > > > >>>>>> and it would be great to provide the
>> backpressure
>> > > > > > monitoring
>> > > > > > > > > trigger
>> > > > > > > > > > > >> as
>> > > > > > > > > > > >>>>>> well.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I agree that if there is a good use-case that
>> can
>> > be
>> > > > > > > addressed
>> > > > > > > > > by the
>> > > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is
>> reasonable
>> > > > > > > > > > > >>>>> to add CheckpointTrigger and replace the current
>> > > > > triggering
>> > > > > > > > > mechanism
>> > > > > > > > > > > >>>> with
>> > > > > > > > > > > >>>>> it.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I also agree that we will likely find such a
>> > > use-case.
>> > > > > For
>> > > > > > > > > example,
>> > > > > > > > > > > >>>> suppose
>> > > > > > > > > > > >>>>> the source records have event timestamps, then
>> it
>> > is
>> > > > > likely
>> > > > > > > > > > > >>>>> that we can use the trigger to dynamically
>> control
>> > > the
>> > > > > > > > > checkpointing
>> > > > > > > > > > > >>>>> interval based on the difference between the
>> > > watermark
>> > > > > and
>> > > > > > > > > current
>> > > > > > > > > > > >> system
>> > > > > > > > > > > >>>>> time.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> But I am not sure the addition of this
>> > > > CheckpointTrigger
>> > > > > > > should
>> > > > > > > > > be
>> > > > > > > > > > > >>>> coupled
>> > > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled
>> > probably
>> > > > > > depends
>> > > > > > > on
>> > > > > > > > > the
>> > > > > > > > > > > >>>>> concrete API design around CheckpointTrigger.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> If you would be adamant that the backpressure
>> > > > monitoring
>> > > > > > > > doesn't
>> > > > > > > > > cover
>> > > > > > > > > > > >>>> well
>> > > > > > > > > > > >>>>>> enough your use case, I would be ok to provide
>> the
>> > > > hacky
>> > > > > > > > > version that
>> > > > > > > > > > > >> I
>> > > > > > > > > > > >>>>>> also mentioned
>> > > > > > > > > > > >>>>>> before:
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>> """
>> > > > > > > > > > > >>>>>> Especially that if my proposed algorithm
>> wouldn't
>> > > work
>> > > > > > good
>> > > > > > > > > enough,
>> > > > > > > > > > > >>>> there
>> > > > > > > > > > > >>>>>> is
>> > > > > > > > > > > >>>>>> an obvious solution, that any source could add
>> a
>> > > > metric,
>> > > > > > > like
>> > > > > > > > > let say
>> > > > > > > > > > > >>>>>> "processingBacklog: true/false", and the
>> > > > > > `CheckpointTrigger`
>> > > > > > > > > > > >>>>>> could use this as an override to always switch
>> to
>> > > the
>> > > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need
>> > it,
>> > > > but
>> > > > > > > that's
>> > > > > > > > > always
>> > > > > > > > > > > >>>> an
>> > > > > > > > > > > >>>>>> option
>> > > > > > > > > > > >>>>>> that would be basically equivalent to your
>> > original
>> > > > > > > proposal.
>> > > > > > > > > > > >>>>>> """
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
>> > > > > > > > > > > >>>>> - Have all source operators emit a metric named
>> > > > > > > > > "processingBacklog".
>> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
>> > > > > checkpointing
>> > > > > > > > > interval to
>> > > > > > > > > > > >> be
>> > > > > > > > > > > >>>>> used when any source is processing backlog".
>> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
>> > > periodically
>> > > > > from
>> > > > > > > all
>> > > > > > > > > source
>> > > > > > > > > > > >>>>> operators and uses the newly added config value
>> as
>> > > > > > > appropriate.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> The challenge with this approach is that we
>> need to
>> > > > > define
>> > > > > > > the
>> > > > > > > > > > > >> semantics
>> > > > > > > > > > > >>>> of
>> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
>> source
>> > > > > > operators
>> > > > > > > > > > > >>>>> implement this metric. I am not sure we are
>> able to
>> > > do
>> > > > > this
>> > > > > > > yet
>> > > > > > > > > without
>> > > > > > > > > > > >>>>> having users explicitly provide this information
>> > on a
>> > > > > > > > per-source
>> > > > > > > > > basis.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka
>> source,
>> > > > should
>> > > > > it
>> > > > > > > > emit
>> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
>> > might
>> > > > use
>> > > > > > long
>> > > > > > > > > > > >>>> checkpointing
>> > > > > > > > > > > >>>>> interval even
>> > > > > > > > > > > >>>>> if the job is asked to process data starting
>> from
>> > now
>> > > > to
>> > > > > > the
>> > > > > > > > > next 1
>> > > > > > > > > > > >> hour.
>> > > > > > > > > > > >>>>> If no, then the job might use the short
>> > checkpointing
>> > > > > > > interval
>> > > > > > > > > > > >>>>> even if the job is asked to re-process data
>> > starting
>> > > > > from 7
>> > > > > > > > days
>> > > > > > > > > ago.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> - How can users specify the
>> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
>> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
>> > > > > > CheckpointTrigger
>> > > > > > > > that
>> > > > > > > > > > > >>>>> end-users
>> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
>> > What
>> > > > > would
>> > > > > > > > that
>> > > > > > > > > look
>> > > > > > > > > > > >>>>> like?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Also as I mentioned before, just like metric
>> > > reporters
>> > > > > are
>> > > > > > > > > configured:
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
>> > > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own
>> custom
>> > > > > > > > configuration.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
>> > > approach
>> > > > > > based
>> > > > > > > > on
>> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Yes, as usual, more generic things are more
>> > > > complicated,
>> > > > > > but
>> > > > > > > > > often
>> > > > > > > > > > > >> more
>> > > > > > > > > > > >>>>>> useful in the long run.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> and harder to use.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> I don't agree. Why setting in config
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> execution.checkpointing.trigger:
>> > > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
>> > > > > > > > > > > >>>>>> 1s
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
>> > > > > > > > > > > >>>>>> 30s
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> that we could even provide a shortcut to the
>> above
>> > > > > > construct
>> > > > > > > > > via:
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s
>> > > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> is harder compared to setting two/three
>> checkpoint
>> > > > > > > intervals,
>> > > > > > > > > one in
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`,
>> > > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on
>> the
>> > > > > source
>> > > > > > > > > itself?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> If we can address the use-case by providing just
>> > the
>> > > > two
>> > > > > > > > > job-level
>> > > > > > > > > > > >> config
>> > > > > > > > > > > >>>>> as described above, I agree it will indeed be
>> > > simpler.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I have tried to achieve this goal. But the
>> caveat
>> > is
>> > > > that
>> > > > > > it
>> > > > > > > > > requires
>> > > > > > > > > > > >>>> much
>> > > > > > > > > > > >>>>> more work than described above in order to give
>> the
>> > > > > configs
>> > > > > > > > > > > >> well-defined
>> > > > > > > > > > > >>>>> semantics. So I find it simpler to just use the
>> > > > approach
>> > > > > in
>> > > > > > > > > FLIP-309.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Let me explain my concern below. It will be
>> great
>> > if
>> > > > you
>> > > > > or
>> > > > > > > > > someone
>> > > > > > > > > > > >> else
>> > > > > > > > > > > >>>>> can help provide a solution.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> 1) We need to clearly document when the
>> > fast-interval
>> > > > and
>> > > > > > > > > slow-interval
>> > > > > > > > > > > >>>>> will be used so that users can derive the
>> expected
>> > > > > behavior
>> > > > > > > of
>> > > > > > > > > the job
>> > > > > > > > > > > >>>> and
>> > > > > > > > > > > >>>>> be able to config these values.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on
>> the
>> > > > > > behavior
>> > > > > > > of
>> > > > > > > > > the
>> > > > > > > > > > > >>>> source
>> > > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no
>> > existing
>> > > > > > concepts
>> > > > > > > > of
>> > > > > > > > > source
>> > > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the
>> target
>> > > > > > behavior.
>> > > > > > > > For
>> > > > > > > > > > > >>>> example,
>> > > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely
>> > snapshot
>> > > > > phase
>> > > > > > > and
>> > > > > > > > > binlog
>> > > > > > > > > > > >>>>> phase, which are not explicitly exposed to its
>> > users
>> > > > via
>> > > > > > > source
>> > > > > > > > > > > >> operator
>> > > > > > > > > > > >>>>> API. And we probably should not enumerate all
>> > > internal
>> > > > > > phases
>> > > > > > > > of
>> > > > > > > > > all
>> > > > > > > > > > > >>>> source
>> > > > > > > > > > > >>>>> operators that are affected by fast/slow
>> interval.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> 3) An alternative approach might be to define a
>> new
>> > > > > concept
>> > > > > > > > (e.g.
>> > > > > > > > > > > >>>>> processingBacklog) that is applied to all source
>> > > > > operators.
>> > > > > > > > Then
>> > > > > > > > > the
>> > > > > > > > > > > >>>>> fast/slow interval's documentation can depend on
>> > this
>> > > > > > > concept.
>> > > > > > > > > That
>> > > > > > > > > > > >> means
>> > > > > > > > > > > >>>>> we have to add a top-level concept (similar to
>> > source
>> > > > > > > > > boundedness) and
>> > > > > > > > > > > >>>>> require all source operators to specify how they
>> > > > enforce
>> > > > > > this
>> > > > > > > > > concept
>> > > > > > > > > > > >>>> (e.g.
>> > > > > > > > > > > >>>>> FileSystemSource always emits
>> > > processingBacklog=true).
>> > > > > And
>> > > > > > > > there
>> > > > > > > > > might
>> > > > > > > > > > > >> be
>> > > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded
>> Kafka
>> > > > > Source)
>> > > > > > > can
>> > > > > > > > > not
>> > > > > > > > > > > >>>>> automatically derive the value of this concept,
>> in
>> > > > which
>> > > > > > case
>> > > > > > > > we
>> > > > > > > > > need
>> > > > > > > > > > > >> to
>> > > > > > > > > > > >>>>> provide option for users to explicitly specify
>> the
>> > > > value
>> > > > > > for
>> > > > > > > > this
>> > > > > > > > > > > >> concept
>> > > > > > > > > > > >>>>> on a per-source basis.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>> And it probably also has the issues of "having
>> > two
>> > > > > places
>> > > > > > > to
>> > > > > > > > > > > >>>> configure
>> > > > > > > > > > > >>>>>> checkpointing
>> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
>> > source
>> > > to
>> > > > > > > > > implement a
>> > > > > > > > > > > >>>>>> different
>> > > > > > > > > > > >>>>>>> API" (as mentioned below).
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> No, it doesn't.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the
>> user-facing
>> > > API
>> > > > > to
>> > > > > > be
>> > > > > > > > > > > >>>>>>> clearly defined and users should be able to
>> use
>> > the
>> > > > API
>> > > > > > > > without
>> > > > > > > > > > > >>>> concern
>> > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > >>>>>>> regression. And this requirement is more
>> > important
>> > > > than
>> > > > > > the
>> > > > > > > > > other
>> > > > > > > > > > > >>>> goals
>> > > > > > > > > > > >>>>>>> discussed above because it is related to the
>> > > > > > > > > stability/performance of
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> production job. What do you think?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> I don't agree with this. There are many things
>> > that
>> > > > work
>> > > > > > > > > something in
>> > > > > > > > > > > >>>>>> between perfectly and well enough
>> > > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%,
>> maybe
>> > > 95%
>> > > > > or
>> > > > > > > > maybe
>> > > > > > > > > 60%),
>> > > > > > > > > > > >>>>> while
>> > > > > > > > > > > >>>>>> still being very useful.
>> > > > > > > > > > > >>>>>> Good examples are: selection of state backend,
>> > > > unaligned
>> > > > > > > > > checkpoints,
>> > > > > > > > > > > >>>>>> buffer debloating but frankly if I go
>> > > > > > > > > > > >>>>>> through list of currently available config
>> > options,
>> > > > > > > something
>> > > > > > > > > like
>> > > > > > > > > > > >> half
>> > > > > > > > > > > >>>>> of
>> > > > > > > > > > > >>>>>> them can cause regressions. Heck,
>> > > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in
>> 100%
>> > of
>> > > > the
>> > > > > > use
>> > > > > > > > > cases, due
>> > > > > > > > > > > >>>>> to a
>> > > > > > > > > > > >>>>>> variety of design choices. Of
>> > > > > > > > > > > >>>>>> course, the more use cases are fine with said
>> > > feature,
>> > > > > the
>> > > > > > > > > better, but
>> > > > > > > > > > > >>>> we
>> > > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover
>> > > > > > > > > > > >>>>>> 100% of the cases, as that's impossible.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> In this particular case, if back pressure
>> > monitoring
>> > > > > > > trigger
>> > > > > > > > > can work
>> > > > > > > > > > > >>>>> well
>> > > > > > > > > > > >>>>>> enough in 95% of cases, I would
>> > > > > > > > > > > >>>>>> say that's already better than the originally
>> > > proposed
>> > > > > > > > > alternative,
>> > > > > > > > > > > >>>> which
>> > > > > > > > > > > >>>>>> doesn't work at all if user has a large
>> > > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including when
>> > > using
>> > > > > > > > > HybridSource
>> > > > > > > > > > > >>>> AFTER
>> > > > > > > > > > > >>>>>> the switch to Kafka has
>> > > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try
>> to
>> > > > improve
>> > > > > > the
>> > > > > > > > > behaviour
>> > > > > > > > > > > >>>>> over
>> > > > > > > > > > > >>>>>> time, but ultimately, users can
>> > > > > > > > > > > >>>>>> decide to just run a fixed checkpoint interval
>> (or
>> > > at
>> > > > > > worst
>> > > > > > > > use
>> > > > > > > > > the
>> > > > > > > > > > > >>>> hacky
>> > > > > > > > > > > >>>>>> checkpoint trigger that I mentioned
>> > > > > > > > > > > >>>>>> before a couple of times).
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Also to be pedantic, if a user naively selects
>> > > > > > slow-interval
>> > > > > > > > in
>> > > > > > > > > your
>> > > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's
>> > > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his
>> job
>> > can
>> > > > end
>> > > > > > up
>> > > > > > > in
>> > > > > > > > > a state
>> > > > > > > > > > > >>>>> that
>> > > > > > > > > > > >>>>>> it can not make any progress,
>> > > > > > > > > > > >>>>>> this arguably is quite serious regression.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I probably should not say it is "hard
>> requirement".
>> > > > After
>> > > > > > all
>> > > > > > > > > there are
>> > > > > > > > > > > >>>>> pros/cons. We will need to consider
>> implementation
>> > > > > > > complexity,
>> > > > > > > > > > > >> usability,
>> > > > > > > > > > > >>>>> extensibility etc.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I just don't think we should take it for
>> granted to
>> > > > > > introduce
>> > > > > > > > > > > >> regression
>> > > > > > > > > > > >>>>> for one use-case in order to support another
>> > > use-case.
>> > > > If
>> > > > > > we
>> > > > > > > > can
>> > > > > > > > > not
>> > > > > > > > > > > >> find
>> > > > > > > > > > > >>>>> an algorithm/solution that addresses
>> > > > > > > > > > > >>>>> both use-case well, I hope we can be open to
>> tackle
>> > > > them
>> > > > > > > > > separately so
>> > > > > > > > > > > >>>> that
>> > > > > > > > > > > >>>>> users can choose the option that best fits their
>> > > needs.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> All things else being equal, I think it is
>> > preferred
>> > > > for
>> > > > > > > > > user-facing
>> > > > > > > > > > > >> API
>> > > > > > > > > > > >>>> to
>> > > > > > > > > > > >>>>> be clearly defined and let users should be able
>> to
>> > > use
>> > > > > the
>> > > > > > > API
>> > > > > > > > > without
>> > > > > > > > > > > >>>>> concern of regression.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Maybe we can list pros/cons for the alternative
>> > > > > approaches
>> > > > > > we
>> > > > > > > > > have been
>> > > > > > > > > > > >>>>> discussing and see choose the best approach. And
>> > > maybe
>> > > > we
>> > > > > > > will
>> > > > > > > > > end up
>> > > > > > > > > > > >>>>> finding that use-case
>> > > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled
>> > > separately
>> > > > > > from
>> > > > > > > > the
>> > > > > > > > > > > >> use-case
>> > > > > > > > > > > >>>>> in FLIP-309.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
>> > > > > > > > > > > >>>>> backPressuredTimeMsPerSecond
>> > > > > > > > > > > >>>>>> =
>> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure
>> =
>> > > > > > > > > > > >>>> numRecordsInPerSecond /
>> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
>> > above
>> > > > > > > > algorithm.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Do you mean
>> > "maxRecordsConsumedWithoutBackpressure
>> > > =
>> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
>> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
>> > > > > > > > > > > >>>> metricsUpdateInterval"?
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> It looks like there is indeed some mistake in
>> my
>> > > > > proposal
>> > > > > > > > > above. Yours
>> > > > > > > > > > > >>>>> look
>> > > > > > > > > > > >>>>>> more correct, it probably
>> > > > > > > > > > > >>>>>> still needs some safeguard/special handling if
>> > > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950`
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> The only information it can access is the
>> > backlog.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Again no. It can access whatever we want to
>> > provide
>> > > to
>> > > > > it.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a
>> matter
>> > > of
>> > > > > > > tweaking
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>> parameters and the algorithm itself,
>> > > > > > > > > > > >>>>>> and how much safety-net do we want to have.
>> > > > Ultimately,
>> > > > > > I'm
>> > > > > > > > > pretty
>> > > > > > > > > > > >> sure
>> > > > > > > > > > > >>>>>> that's a (for 95-99% of cases)
>> > > > > > > > > > > >>>>>> solvable problem. If not, there is always the
>> > hacky
>> > > > > > > solution,
>> > > > > > > > > that
>> > > > > > > > > > > >>>> could
>> > > > > > > > > > > >>>>> be
>> > > > > > > > > > > >>>>>> even integrated into this above
>> > > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to
>> always
>> > > reach
>> > > > > > > > > > > >> `slow-interval`.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the
>> > > > checkpointing
>> > > > > > > > > interval in
>> > > > > > > > > > > >>>> your
>> > > > > > > > > > > >>>>>> counter example. In most cases
>> > > > > > > > > > > >>>>>> any interval above 1 minute would inflict
>> pretty
>> > > > > > negligible
>> > > > > > > > > overheads,
>> > > > > > > > > > > >>>> so
>> > > > > > > > > > > >>>>>> all in all, I would doubt there is
>> > > > > > > > > > > >>>>>> a significant benefit (in most cases) of
>> > increasing
>> > > 3
>> > > > > > minute
>> > > > > > > > > > > >> checkpoint
>> > > > > > > > > > > >>>>>> interval to anything more, let alone
>> > > > > > > > > > > >>>>>> 30 minutes.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I am not sure we should design the algorithm
>> with
>> > the
>> > > > > > > > assumption
>> > > > > > > > > that
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>> short checkpointing interval will always be
>> higher
>> > > > than 1
>> > > > > > > > minute
>> > > > > > > > > etc.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> I agree the proposed algorithm can solve most
>> cases
>> > > > where
>> > > > > > the
>> > > > > > > > > resource
>> > > > > > > > > > > >> is
>> > > > > > > > > > > >>>>> sufficient and there is always no backlog in
>> source
>> > > > > > subtasks.
>> > > > > > > > On
>> > > > > > > > > the
>> > > > > > > > > > > >>>> other
>> > > > > > > > > > > >>>>> hand, what makes SRE
>> > > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases
>> > where
>> > > > the
>> > > > > > > > traffic
>> > > > > > > > > is
>> > > > > > > > > > > >> spiky
>> > > > > > > > > > > >>>>> and the cluster is reaching its capacity limit.
>> The
>> > > > > ability
>> > > > > > > to
>> > > > > > > > > predict
>> > > > > > > > > > > >>>> and
>> > > > > > > > > > > >>>>> control Flink job's behavior (including
>> > checkpointing
>> > > > > > > interval)
>> > > > > > > > > can
>> > > > > > > > > > > >>>>> considerably reduce the burden of manging Flink
>> > jobs.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Best,
>> > > > > > > > > > > >>>>> Dong
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Best,
>> > > > > > > > > > > >>>>>> Piotrek
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <
>> > > > lindon...@gmail.com>
>> > > > > > > > > napisał(a):
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> Hi Piotr,
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Thanks for the explanations. I have some
>> followup
>> > > > > > questions
>> > > > > > > > > below.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr
>> Nowojski <
>> > > > > > > > > pnowoj...@apache.org
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>> Hi All,
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed!
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm
>> > > leaning
>> > > > > > > towards
>> > > > > > > > > > > >>>>>> implementing
>> > > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via
>> some
>> > > > > pluggable
>> > > > > > > > > interface.
>> > > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I
>> guess
>> > > not
>> > > > > > > > everyone
>> > > > > > > > > is
>> > > > > > > > > > > >>>>> using
>> > > > > > > > > > > >>>>>>>> Flink Kubernetes Operator.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> @Dong
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>> I am not sure metrics such as
>> isBackPressured
>> > are
>> > > > > > already
>> > > > > > > > > sent to
>> > > > > > > > > > > >>>>> JM.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Fetching code path on the JM:
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > >
>> > > > >
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Example code path accessing Task level
>> metrics
>> > via
>> > > > JM
>> > > > > > > using
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>>>> `MetricStore`:
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the
>> code
>> > > > that
>> > > > > > > > invoked
>> > > > > > > > > these
>> > > > > > > > > > > >>>>> two
>> > > > > > > > > > > >>>>>>> classes and found the following information:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> -
>> AggregatingSubtasksMetricsHandler#getStoresis
>> > > > > currently
>> > > > > > > > > invoked
>> > > > > > > > > > > >>>> only
>> > > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked.
>> > > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only
>> > > instantiated
>> > > > > and
>> > > > > > > > > returned by
>> > > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers
>> > > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is
>> only
>> > > used
>> > > > by
>> > > > > > > > > > > >>>>>> RestServerEndpoint.
>> > > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers
>> in
>> > > > > response
>> > > > > > > to
>> > > > > > > > > external
>> > > > > > > > > > > >>>>>> REST
>> > > > > > > > > > > >>>>>>> request.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I understand that JM will get the
>> > > > backpressure-related
>> > > > > > > > metrics
>> > > > > > > > > every
>> > > > > > > > > > > >>>>> time
>> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
>> request
>> > to
>> > > > get
>> > > > > > > these
>> > > > > > > > > > > >>>> metrics.
>> > > > > > > > > > > >>>>>> But
>> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
>> > > always
>> > > > > > > > > receiving the
>> > > > > > > > > > > >>>>> REST
>> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is
>> no
>> > > > human
>> > > > > > > > manually
>> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
>> > does,
>> > > > > what
>> > > > > > is
>> > > > > > > > the
>> > > > > > > > > > > >>>>> interval?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>> For example, let's say every source operator
>> > > > subtask
>> > > > > > > > reports
>> > > > > > > > > this
>> > > > > > > > > > > >>>>>>> metric
>> > > > > > > > > > > >>>>>>>> to
>> > > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100
>> source
>> > > > > > subtasks.
>> > > > > > > > And
>> > > > > > > > > each
>> > > > > > > > > > > >>>>>>> subtask
>> > > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total
>> time
>> > > due
>> > > > to
>> > > > > > > > traffic
>> > > > > > > > > > > >>>>> spikes
>> > > > > > > > > > > >>>>>>> (and
>> > > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time,
>> there
>> > > are
>> > > > 1
>> > > > > -
>> > > > > > > > > 0.9^100 =
>> > > > > > > > > > > >>>>>>> 99.997%
>> > > > > > > > > > > >>>>>>>>> chance that there is at least one subtask
>> that
>> > is
>> > > > > > > > > backpressured.
>> > > > > > > > > > > >>>>> Then
>> > > > > > > > > > > >>>>>>> we
>> > > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to
>> check
>> > > > again.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other
>> related
>> > > > metrics
>> > > > > > > (like
>> > > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that
>> > > > problem.
>> > > > > > > > > > > >>>>>>>> They are recalculated once every metric
>> fetching
>> > > > > > interval,
>> > > > > > > > > and they
>> > > > > > > > > > > >>>>>>> report
>> > > > > > > > > > > >>>>>>>> accurately on average the given subtask spent
>> > > > > > > > > > > >>>>>> busy/idling/backpressured.
>> > > > > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond
>> > > would
>> > > > > > report
>> > > > > > > > > 100ms/s.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Suppose every subtask is already reporting
>> > > > > > > > > > > >>>> backPressuredTimeMsPerSecond
>> > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10
>> operators
>> > > (that
>> > > > > are
>> > > > > > > not
>> > > > > > > > > > > >>>> chained)
>> > > > > > > > > > > >>>>>> and
>> > > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would
>> > need
>> > > to
>> > > > > > > handle
>> > > > > > > > > 10000
>> > > > > > > > > > > >>>>>> requests
>> > > > > > > > > > > >>>>>>> per second to receive metrics from these 1000
>> > > > subtasks.
>> > > > > > It
>> > > > > > > > > seems
>> > > > > > > > > > > >>>> like a
>> > > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized
>> > jobs
>> > > > and
>> > > > > > can
>> > > > > > > > > make JM
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>> performance bottleneck during job execution.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already
>> paying
>> > > this
>> > > > > much
>> > > > > > > > > overhead
>> > > > > > > > > > > >>>> just
>> > > > > > > > > > > >>>>>> for
>> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
>> > still
>> > > > > doubt
>> > > > > > > it
>> > > > > > > > > is true.
>> > > > > > > > > > > >>>>> Can
>> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
>> > configured?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
>> add
>> > > > extra
>> > > > > > code
>> > > > > > > > to
>> > > > > > > > > invoke
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means
>> we
>> > > need
>> > > > to
>> > > > > > > > > considerably
>> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
>> > the
>> > > > > > overhead
>> > > > > > > > > will
>> > > > > > > > > > > >>>>> increase
>> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
>> > pose
>> > > > risk
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > > >>>>>> scalability
>> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we
>> should
>> > do
>> > > > > this.
>> > > > > > > What
>> > > > > > > > > do you
>> > > > > > > > > > > >>>>>> think?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>> While it will be nice to support additional
>> > > > use-cases
>> > > > > > > > > > > >>>>>>>>> with one proposal, it is probably also
>> > reasonable
>> > > > to
>> > > > > > make
>> > > > > > > > > > > >>>>> incremental
>> > > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit
>> > > use-case
>> > > > > > > first.
>> > > > > > > > > The
>> > > > > > > > > > > >>>>> choice
>> > > > > > > > > > > >>>>>>>>> really depends on the complexity and the
>> > > importance
>> > > > > of
>> > > > > > > > > supporting
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>>> extra
>> > > > > > > > > > > >>>>>>>>> use-cases.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> That would be true, if that was a private
>> > > > > implementation
>> > > > > > > > > detail or
>> > > > > > > > > > > >>>> if
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the
>> > direct
>> > > > path
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > final
>> > > > > > > > > > > >>>>>>>> solution.
>> > > > > > > > > > > >>>>>>>> That's unfortunately not the case here. This
>> > will
>> > > > add
>> > > > > > > public
>> > > > > > > > > facing
>> > > > > > > > > > > >>>>>> API,
>> > > > > > > > > > > >>>>>>>> that we will later need to maintain, no
>> matter
>> > > what
>> > > > > the
>> > > > > > > > final
>> > > > > > > > > > > >>>>> solution
>> > > > > > > > > > > >>>>>>> will
>> > > > > > > > > > > >>>>>>>> be,
>> > > > > > > > > > > >>>>>>>> and at the moment at least I don't see it
>> being
>> > > > > related
>> > > > > > > to a
>> > > > > > > > > > > >>>>> "perfect"
>> > > > > > > > > > > >>>>>>>> solution.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution
>> first.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>> I guess the point is that the suggested
>> > approach,
>> > > > > which
>> > > > > > > > > > > >>>> dynamically
>> > > > > > > > > > > >>>>>>>>> determines the checkpointing interval based
>> on
>> > > the
>> > > > > > > > > backpressure,
>> > > > > > > > > > > >>>>> may
>> > > > > > > > > > > >>>>>>>> cause
>> > > > > > > > > > > >>>>>>>>> regression when the checkpointing interval
>> is
>> > > > > > relatively
>> > > > > > > > low.
>> > > > > > > > > > > >>>> This
>> > > > > > > > > > > >>>>>>> makes
>> > > > > > > > > > > >>>>>>>> it
>> > > > > > > > > > > >>>>>>>>> hard for users to enable this feature in
>> > > > production.
>> > > > > It
>> > > > > > > is
>> > > > > > > > > like
>> > > > > > > > > > > >>>> an
>> > > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed
>> to
>> > > work
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that
>> would
>> > > > > require
>> > > > > > > > less
>> > > > > > > > > > > >>>>>>> configuration
>> > > > > > > > > > > >>>>>>>> is usually more difficult then static
>> > > > configurations.
>> > > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that.
>> > > > > Especially
>> > > > > > > that
>> > > > > > > > > if my
>> > > > > > > > > > > >>>>>>> proposed
>> > > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is
>> > > > > > > > > > > >>>>>>>> an obvious solution, that any source could
>> add a
>> > > > > metric,
>> > > > > > > > like
>> > > > > > > > > let
>> > > > > > > > > > > >>>> say
>> > > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the
>> > > > > > > `CheckpointTrigger`
>> > > > > > > > > > > >>>>>>>> could use this as an override to always
>> switch
>> > to
>> > > > the
>> > > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we
>> need
>> > > it,
>> > > > > but
>> > > > > > > > that's
>> > > > > > > > > > > >>>> always
>> > > > > > > > > > > >>>>>> an
>> > > > > > > > > > > >>>>>>>> option
>> > > > > > > > > > > >>>>>>>> that would be basically equivalent to your
>> > > original
>> > > > > > > > proposal.
>> > > > > > > > > Or
>> > > > > > > > > > > >>>> even
>> > > > > > > > > > > >>>>>>>> source could add
>> "suggestedCheckpointInterval :
>> > > > int",
>> > > > > > and
>> > > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if
>> > > present
>> > > > > as a
>> > > > > > > > hint
>> > > > > > > > > in
>> > > > > > > > > > > >>>> one
>> > > > > > > > > > > >>>>>> way
>> > > > > > > > > > > >>>>>>> or
>> > > > > > > > > > > >>>>>>>> another.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> So far we have talked about the possibility of
>> > > using
>> > > > > > > > > > > >>>> CheckpointTrigger
>> > > > > > > > > > > >>>>>> and
>> > > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger
>> > > > > > > > > > > >>>>>>> and read metric values.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Can you help answer the following questions so
>> > > that I
>> > > > > can
>> > > > > > > > > understand
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> alternative solution more concretely:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> - What is the interface of this
>> > CheckpointTrigger?
>> > > > For
>> > > > > > > > > example, are
>> > > > > > > > > > > >>>> we
>> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context
>> that it
>> > > can
>> > > > > use
>> > > > > > > to
>> > > > > > > > > fetch
>> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
>> > > understand
>> > > > > what
>> > > > > > > > > information
>> > > > > > > > > > > >>>>>> this
>> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
>> > the
>> > > > > > > checkpoint
>> > > > > > > > > > > >>>> decision.
>> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
>> > > > example,
>> > > > > > is
>> > > > > > > it
>> > > > > > > > > going
>> > > > > > > > > > > >>>> to
>> > > > > > > > > > > >>>>>> run
>> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or
>> is it
>> > > > going
>> > > > > > to
>> > > > > > > > run
>> > > > > > > > > on the
>> > > > > > > > > > > >>>>> JM?
>> > > > > > > > > > > >>>>>>> - Are we going to provide a default
>> > implementation
>> > > of
>> > > > > > this
>> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
>> > > > > algorithm
>> > > > > > > > > described
>> > > > > > > > > > > >>>>> below,
>> > > > > > > > > > > >>>>>>> or do we expect each source operator
>> developer to
>> > > > > > implement
>> > > > > > > > > their own
>> > > > > > > > > > > >>>>>>> CheckpointTrigger?
>> > > > > > > > > > > >>>>>>> - How can users specify the
>> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
>> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
>> > > > > > CheckpointTrigger
>> > > > > > > > that
>> > > > > > > > > > > >>>>> end-users
>> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
>> > What
>> > > > > would
>> > > > > > > > that
>> > > > > > > > > look
>> > > > > > > > > > > >>>>> like?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
>> > > approach
>> > > > > > based
>> > > > > > > > on
>> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and
>> harder
>> > to
>> > > > > use.
>> > > > > > > And
>> > > > > > > > it
>> > > > > > > > > > > >>>>> probably
>> > > > > > > > > > > >>>>>>> also has the issues of "having two places to
>> > > > configure
>> > > > > > > > > checkpointing
>> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
>> > source
>> > > to
>> > > > > > > > > implement a
>> > > > > > > > > > > >>>>>> different
>> > > > > > > > > > > >>>>>>> API" (as mentioned below).
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing
>> the
>> > > > answers
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > above
>> > > > > > > > > > > >>>>>>> questions.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>> On the other hand, the approach currently
>> > > proposed
>> > > > in
>> > > > > > the
>> > > > > > > > > FLIP is
>> > > > > > > > > > > >>>>>> much
>> > > > > > > > > > > >>>>>>>>> simpler as it does not depend on
>> backpressure.
>> > > > Users
>> > > > > > > > specify
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>> extra
>> > > > > > > > > > > >>>>>>>>> interval requirement on the specific sources
>> > > (e.g.
>> > > > > > > > > HybridSource,
>> > > > > > > > > > > >>>>>> MySQL
>> > > > > > > > > > > >>>>>>>> CDC
>> > > > > > > > > > > >>>>>>>>> Source) and can easily know the
>> checkpointing
>> > > > > interval
>> > > > > > > will
>> > > > > > > > > be
>> > > > > > > > > > > >>>> used
>> > > > > > > > > > > >>>>>> on
>> > > > > > > > > > > >>>>>>>> the
>> > > > > > > > > > > >>>>>>>>> continuous phase of the corresponding
>> source.
>> > > This
>> > > > is
>> > > > > > > > pretty
>> > > > > > > > > much
>> > > > > > > > > > > >>>>>> same
>> > > > > > > > > > > >>>>>>> as
>> > > > > > > > > > > >>>>>>>>> how users use the existing
>> > > > > > > execution.checkpointing.interval
>> > > > > > > > > > > >>>> config.
>> > > > > > > > > > > >>>>>> So
>> > > > > > > > > > > >>>>>>>>> there is no extra concern of regression
>> caused
>> > by
>> > > > > this
>> > > > > > > > > approach.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> To an extent, but as I have already
>> previously
>> > > > > > mentioned I
>> > > > > > > > > really
>> > > > > > > > > > > >>>>>> really
>> > > > > > > > > > > >>>>>>> do
>> > > > > > > > > > > >>>>>>>> not like idea of:
>> > > > > > > > > > > >>>>>>>> - having two places to configure
>> checkpointing
>> > > > > interval
>> > > > > > > > > (config
>> > > > > > > > > > > >>>>> file
>> > > > > > > > > > > >>>>>>> and
>> > > > > > > > > > > >>>>>>>> in the Source builders)
>> > > > > > > > > > > >>>>>>>> - giving flexibility for every source to
>> > > implement a
>> > > > > > > > different
>> > > > > > > > > > > >>>> API
>> > > > > > > > > > > >>>>>> for
>> > > > > > > > > > > >>>>>>>> that purpose
>> > > > > > > > > > > >>>>>>>> - creating a solution that is not generic
>> > enough,
>> > > so
>> > > > > > that
>> > > > > > > we
>> > > > > > > > > will
>> > > > > > > > > > > >>>>>> need
>> > > > > > > > > > > >>>>>>> a
>> > > > > > > > > > > >>>>>>>> completely different mechanism in the future
>> > > anyway
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Yeah, I understand different developers might
>> > have
>> > > > > > > different
>> > > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately,
>> there
>> > > > might
>> > > > > > not
>> > > > > > > > be
>> > > > > > > > > a
>> > > > > > > > > > > >>>>> perfect
>> > > > > > > > > > > >>>>>>> solution and we have to choose based on the
>> > > pros/cons
>> > > > > of
>> > > > > > > > these
>> > > > > > > > > > > >>>>> solutions.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I agree with you that, all things being
>> equal, it
>> > > is
>> > > > > > > > > preferable to 1)
>> > > > > > > > > > > >>>>>> have
>> > > > > > > > > > > >>>>>>> one place to configure checkpointing
>> intervals,
>> > 2)
>> > > > have
>> > > > > > all
>> > > > > > > > > source
>> > > > > > > > > > > >>>>>>> operators use the same API, and 3) create a
>> > > solution
>> > > > > that
>> > > > > > > is
>> > > > > > > > > generic
>> > > > > > > > > > > >>>>> and
>> > > > > > > > > > > >>>>>>> last lasting. Note that these three goals
>> affects
>> > > the
>> > > > > > > > > usability and
>> > > > > > > > > > > >>>>>>> extensibility of the API, but not necessarily
>> the
>> > > > > > > > > > > >>>> stability/performance
>> > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > >>>>>>> the production job.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals.
>> For
>> > > > > example,
>> > > > > > > it
>> > > > > > > > > is very
>> > > > > > > > > > > >>>>>> useful
>> > > > > > > > > > > >>>>>>> for the job's behavior to be predictable and
>> > > > > > interpretable
>> > > > > > > so
>> > > > > > > > > that
>> > > > > > > > > > > >>>> SRE
>> > > > > > > > > > > >>>>>> can
>> > > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We
>> can
>> > > > list
>> > > > > > > these
>> > > > > > > > > > > >>>> pros/cons
>> > > > > > > > > > > >>>>>>> altogether later.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I am wondering if we can first agree on the
>> > > priority
>> > > > of
>> > > > > > > goals
>> > > > > > > > > we want
>> > > > > > > > > > > >>>>> to
>> > > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the
>> > > > > > user-facing
>> > > > > > > > API
>> > > > > > > > > to be
>> > > > > > > > > > > >>>>>>> clearly defined and users should be able to
>> use
>> > the
>> > > > API
>> > > > > > > > without
>> > > > > > > > > > > >>>> concern
>> > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > >>>>>>> regression. And this requirement is more
>> > important
>> > > > than
>> > > > > > the
>> > > > > > > > > other
>> > > > > > > > > > > >>>> goals
>> > > > > > > > > > > >>>>>>> discussed above because it is related to the
>> > > > > > > > > stability/performance of
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> production job. What do you think?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning
>> more
>> > > > ideas.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I
>> > think
>> > > we
>> > > > > > don't
>> > > > > > > > > need to
>> > > > > > > > > > > >>>>>> check
>> > > > > > > > > > > >>>>>>>> for the backpressure status, or how much
>> > > overloaded
>> > > > > all
>> > > > > > of
>> > > > > > > > the
>> > > > > > > > > > > >>>>>> operators
>> > > > > > > > > > > >>>>>>>> are.
>> > > > > > > > > > > >>>>>>>> We could just check three things for source
>> > > > operators:
>> > > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length)
>> > > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond
>> > > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s //
>> obtained
>> > > from
>> > > > > > config
>> > > > > > > > > > > >>>>>>>> // Next line calculates how many records can
>> we
>> > > > > consume
>> > > > > > > from
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>> backlog,
>> > > > > > > > > > > >>>>>>>> assuming
>> > > > > > > > > > > >>>>>>>> // that magically the reason behind a
>> > backpressure
>> > > > > > > vanishes.
>> > > > > > > > > We
>> > > > > > > > > > > >>>> will
>> > > > > > > > > > > >>>>>> use
>> > > > > > > > > > > >>>>>>>> this only as
>> > > > > > > > > > > >>>>>>>> // a safeguard  against scenarios like for
>> > example
>> > > > if
>> > > > > > > > > backpressure
>> > > > > > > > > > > >>>>> was
>> > > > > > > > > > > >>>>>>>> caused by some
>> > > > > > > > > > > >>>>>>>> // intermittent failure/performance
>> degradation.
>> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure =
>> > > > > > > > > (numRecordsInPerSecond /
>> > > > > > > > > > > >>>>> (1000
>> > > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) *
>> > > > > > > > > metricsUpdateInterval
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
>> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond =
>> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure
>> =
>> > > > > > > > > > > >>>> numRecordsInPerSecond /
>> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
>> > above
>> > > > > > > > algorithm.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Do you mean
>> > "maxRecordsConsumedWithoutBackpressure
>> > > =
>> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
>> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
>> > > > > > > > > > > >>>> metricsUpdateInterval"?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> // we are excluding
>> > > > > > maxRecordsConsumedWithoutBackpressure
>> > > > > > > > > from the
>> > > > > > > > > > > >>>>>>> backlog
>> > > > > > > > > > > >>>>>>>> as
>> > > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back
>> > > pressure
>> > > > > > > > > problems, so
>> > > > > > > > > > > >>>>> that
>> > > > > > > > > > > >>>>>> we
>> > > > > > > > > > > >>>>>>>> don't
>> > > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far
>> far in
>> > > the
>> > > > > > > future,
>> > > > > > > > > while
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>>> backpressure
>> > > > > > > > > > > >>>>>>>> // goes away before we will recalculate
>> metrics
>> > > and
>> > > > > new
>> > > > > > > > > > > >>>> checkpointing
>> > > > > > > > > > > >>>>>>>> interval
>> > > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords -
>> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) /
>> > > > > > > > numRecordsInPerSecond
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate
>> > desired
>> > > > > > > > > checkpointed
>> > > > > > > > > > > >>>>>> interval
>> > > > > > > > > > > >>>>>>>> for example like this:
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval =
>> > > > > > timeToConsumeBacklog /
>> > > > > > > > 10;
>> > > > > > > > > > > >>>> //this
>> > > > > > > > > > > >>>>>> may
>> > > > > > > > > > > >>>>>>>> need some refining
>> > > > > > > > > > > >>>>>>>> long nextCheckpointInterval =
>> > > > > > > > min(max(fastCheckpointInterval,
>> > > > > > > > > > > >>>>>>>> calculatedCheckpointInterval),
>> > > > > slowCheckpointInterval);
>> > > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs +
>> > > > > > > > > nextCheckpointInterval;
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> WDYT?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I think the idea of the above algorithm is to
>> > > incline
>> > > > > to
>> > > > > > > use
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure
>> > the
>> > > > > > backlog
>> > > > > > > > > will take
>> > > > > > > > > > > >>>> a
>> > > > > > > > > > > >>>>>> long
>> > > > > > > > > > > >>>>>>> time to process. This can alleviate the
>> concern
>> > of
>> > > > > > > regression
>> > > > > > > > > during
>> > > > > > > > > > > >>>>> the
>> > > > > > > > > > > >>>>>>> continuous_bounded phase since we are more
>> likely
>> > > to
>> > > > > use
>> > > > > > > the
>> > > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause
>> > > > > regression
>> > > > > > > > > during the
>> > > > > > > > > > > >>>>>> bounded
>> > > > > > > > > > > >>>>>>> phase.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I will use a concrete example to explain the
>> risk
>> > > of
>> > > > > > > > > regression:
>> > > > > > > > > > > >>>>>>> - The user is using HybridSource to read from
>> > HDFS
>> > > > > > followed
>> > > > > > > > by
>> > > > > > > > > Kafka.
>> > > > > > > > > > > >>>>> The
>> > > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for
>> data
>> > > > > > freshness
>> > > > > > > > > for the
>> > > > > > > > > > > >>>>> data
>> > > > > > > > > > > >>>>>> in
>> > > > > > > > > > > >>>>>>> HDFS.
>> > > > > > > > > > > >>>>>>> - The user configures the job as below:
>> > > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes
>> > > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes
>> > > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Using the above formulate, we can know that
>> once
>> > > > > > > > pendingRecords
>> > > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
>> > > > > > > > > > > >>>>> calculatedCheckpointInterval
>> > > > > > > > > > > >>>>>> <=
>> > > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use
>> > > > > > slowCheckpointInterval
>> > > > > > > as
>> > > > > > > > > the
>> > > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30
>> > minutes
>> > > > of
>> > > > > > the
>> > > > > > > > > bounded
>> > > > > > > > > > > >>>>> phase,
>> > > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher
>> > than
>> > > > > what
>> > > > > > > the
>> > > > > > > > > user
>> > > > > > > > > > > >>>>> wants.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Also note that the same issue would also
>> > > considerably
>> > > > > > limit
>> > > > > > > > the
>> > > > > > > > > > > >>>>> benefits
>> > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > >>>>>>> the algorithm. For example, during the
>> continuous
>> > > > > phase,
>> > > > > > > the
>> > > > > > > > > > > >>>> algorithm
>> > > > > > > > > > > >>>>>> will
>> > > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309
>> when
>> > > > there
>> > > > > > is
>> > > > > > > at
>> > > > > > > > > least
>> > > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval
>> in
>> > > this
>> > > > > > > extreme
>> > > > > > > > > case
>> > > > > > > > > > > >>>>> (where
>> > > > > > > > > > > >>>>>>> there is 30-minutes backlog in the
>> > > > continous-unbounded
>> > > > > > > phase)
>> > > > > > > > > is
>> > > > > > > > > > > >>>> still
>> > > > > > > > > > > >>>>>>> useful when this happens. But since this is
>> the
>> > > > > un-common
>> > > > > > > > > case, and
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>> right solution is probably to do capacity
>> > planning
>> > > to
>> > > > > > avoid
>> > > > > > > > > this from
>> > > > > > > > > > > >>>>>>> happening in the first place, I am not sure
>> it is
>> > > > worth
>> > > > > > > > > optimizing
>> > > > > > > > > > > >>>> for
>> > > > > > > > > > > >>>>>> this
>> > > > > > > > > > > >>>>>>> case at the cost of regression in the bounded
>> > phase
>> > > > and
>> > > > > > the
>> > > > > > > > > reduced
>> > > > > > > > > > > >>>>>>> operational predictability for users (e.g.
>> what
>> > > > > > > checkpointing
>> > > > > > > > > > > >>>> interval
>> > > > > > > > > > > >>>>>>> should I expect at this stage of the job).
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> I think the fundamental issue with this
>> algorithm
>> > > is
>> > > > > that
>> > > > > > > it
>> > > > > > > > is
>> > > > > > > > > > > >>>> applied
>> > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > >>>>>>> both the bounded phases and the
>> > continous_unbounded
>> > > > > > phases
>> > > > > > > > > without
>> > > > > > > > > > > >>>>>> knowing
>> > > > > > > > > > > >>>>>>> which phase the job is running at. The only
>> > > > information
>> > > > > > it
>> > > > > > > > can
>> > > > > > > > > access
>> > > > > > > > > > > >>>>> is
>> > > > > > > > > > > >>>>>>> the backlog. But two sources with the same
>> amount
>> > > of
>> > > > > > > backlog
>> > > > > > > > > do not
>> > > > > > > > > > > >>>>>>> necessarily mean they have the same data
>> > freshness
>> > > > > > > > requirement.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> In this particular example, users know that
>> the
>> > > data
>> > > > in
>> > > > > > > HDFS
>> > > > > > > > > is very
>> > > > > > > > > > > >>>>> old
>> > > > > > > > > > > >>>>>>> and there is no need for data freshness. Users
>> > can
>> > > > > > express
>> > > > > > > > > signals
>> > > > > > > > > > > >>>> via
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is
>> why
>> > > the
>> > > > > > > current
>> > > > > > > > > approach
>> > > > > > > > > > > >>>>> in
>> > > > > > > > > > > >>>>>>> FLIP-309 can be better in this case.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> What do you think?
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Best,
>> > > > > > > > > > > >>>>>>> Dong
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>> Piotrek
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > > > >>
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to