Hi Ahmed,

Thanks for the comments.

I agree with you and Piotr that it would be useful to provide a more
generic approach to address more use-case in one proposal. On the other
hand, I also think it is important to make sure that the alternative (more
generic) approach can indeed address the extra use-cases reliably as
expected. Then we can compare the pros/cons of these approaches and make
the best choice for Flink users.

If I understand your question correctly, you are asking whether it would be
better to replace upperBoundCheckpointingIntervalForLastSource() with an
API on the source/operator interface.

The short answer is probably no. This is because the expected users of the
API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
are end-users who use Flink API and connector API to develop Flink job. We
probably don't want end-users to directly use the source/operator
interface, which is generally more complicated and intended to be used by
developers of source operators.

FLIP-309 currently proposes to add the API
*SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
checkpointing interval. Are you suggesting that we should replace this API
with a config on the source or operator constructor?

This approach probably works for HybridSource. But I am not sure it works
for MySQL CDC Source (which is also mentioned in the latest FLIP-309
motivation section), which is implemented as one source operator rather
than multiple source operators (which HybridSource does). And we need to
enable the new checkpointing interval in the middle of this source
operator's execution.

If I misunderstood your suggestion, can you provide more details regarding
the proposed API and explain its benefits?

Best,
Dong



On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Dong,
> Thanks for the great proposal.
> The thread is very intuitive along with suggestions from Jing and Piotr.
> As much as I like the simplicity of the proposed approach I think a much
> wider benefit is achieved by taking a more generic approach similar to
> Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> solidifies the argument you are discussing
> >  On the other hand, the approach currently proposed in the FLIP is much
> simpler as it does not depend on backpressure. Users specify the extra
> interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
> Source) and can easily know the checkpointing interval will be used on the
> continuous phase of the corresponding source.
>
> where the base HybridSource can use a `CheckpointTrigger` that doesn't
> depend on backpressure.
>
>
>
>
> I have a couple of questions for clarification.
>
> @Dong
> Do you think in the solution in FLIP 309, instead of using
> ```
> /**
>          * Upper-bound the checkpointing interval when the last source
> added right before this
>          * method invocation is reading data.
>          */
>         public <ToEnumT extends SplitEnumerator, NextSourceT extends
> Source<T,
> ?, ?>>
>                 HybridSourceBuilder<T, ToEnumT>
> upperBoundCheckpointingIntervalForLastSource(
>                         Duration duration) {
>             ...
>         }
> ```
>
> We can have an upperBoundCheckpointingInterval configured in the Source
> Interface, or even better in the Operator one.
> then we can easily implement the one for HybridSource by relying on
> delegation to the `currentReader`.
>
>
> @Piotr
>
> Regarding the more general approach of adjusting based on generic
> triggers/backpressure metrics. I saw you mentioned the resemblance with
> FLIP-271,
> Do you think it is worth going with the REST API proposal for dynamically
> configuring the interval hence the trigger logic could be implemented on
> Flink or external systems like Flink Kubernetes Operator?
> Wdyt? I think the REST API proposal here sounds more and more interesting.
>
>
> Best Regards,
> Ahmed Hamdy
>
>
> On Wed, 31 May 2023 at 07:59, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Piotr,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > First of all we don't need to send any extra signal from source (or non
> > > source) operators. All of the operators are already reporting
> > backpressured
> > > metrics [1]
> > > and all of the metrics are already sent to JobManager. We would only
> need
> > >
> >
> > Hmm... I am not sure metrics such as isBackPressured are already sent to
> > JM. According to the doc
> > <
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > >,
> > this metric is only available on TaskManager. And I could't find the code
> > that sends these metrics to JM. Can you help provide link to the code and
> > doc that shows this metric is reported to JM.
> >
> > Suppose this metric is indeed reported to JM, we also need to confirm
> that
> > the frequency meets our need. For example, typically metrics are updated
> on
> > the order of seconds. The default metric reporter interval (as specified
> in
> > MetricOptions) is 10 seconds, which is probably not sufficient for the
> > suggested approach to work reliably. This is because the longer the
> > interval, the more likely that the algorithm will not trigger checkpoint
> > using the short interval even if all subtasks are not-backpressured.
> >
> > For example, let's say every source operator subtask reports this metric
> to
> > JM once every 10 seconds. There are 100 source subtasks. And each subtask
> > is backpressured roughly 10% of the total time due to traffic spikes (and
> > limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> > chance that there is at least one subtask that is backpressured. Then we
> > have to wait for at least 10 seconds to check again. The expected
> > checkpointing interval can be very close to 30 minutes in the use-case
> > mentioned earlier.
> >
> > to pass some accessor to the metrics to the `CheckpointTrigger`.
> >
> >
> > > > execution.checkpointing.interval.no-backpressure
> > >
> > > Maybe that's the way to go, but as I mentioned before, I could see this
> > > `CheckpointTrigger` to be a pluggable component, that could have been
> > > configured
> > > the same way as `MetricReporters` are right now [2]. We could just
> > provide
> > > out of the box two plugins, one implementing current checkpoint
> > triggering
> > > strategy,
> > > and the other using backpressure.
> > >
> >
> > Yes, it is possible to add a CheckpointTrigger as a pluggable component.
> I
> > am open to this idea as long as it provides benefits over the job-level
> > config (e.g. covers more use-case, or simpler configuration for
> > common-case).
> >
> > I think we can decide how to let user specify this interval after we are
> > able to address the other issues related to the feasibility and
> reliability
> > of the suggested approach.
> >
> >
> > > > I think the root cause of this issue is that the decision of the
> > > > checkpointing interval really depends on the expected impact of a
> > > > checkpoint on the throughput.
> > >
> > > Yes, I agree. Ideally we probably should adjust the checkpointing
> > interval
> > > based on measured latency, for example using latency markers [3], but
> > that
> > > would
> > > require some investigation if latency markers are indeed that costly as
> > > documented and if so optimizing them to solve the performance
> degradation
> > > of enabling
> > > e2e latency tracking.
> >
> >
> > > However, given that the new back pressure monitoring strategy would be
> > > optional AND users could implement their own `CheckpointTrigger` if
> > really
> > > needed
> > > AND I have a feeling that there might be an even better solution (more
> > > about that later).
> > >
> >
> > Overall I guess you are suggesting that 1) we can optimize the overhead
> of
> > latency tracking so that we can always turn it on and 2) we can use the
> > measured latency to dynamically determine checkpointing interval.
> >
> > I can understand this intuition. Still, the devil is in the details.
> After
> > thinking more about this, I am not sure I can find a good way to make it
> > work. I am happy to discuss proc/cons if you provide more concrete
> > solutions.
> >
> > Note that goals of the alternative approach include 1) support sources
> > other than HybridSource and 2) reduce checkpointing interval when the job
> > is backpressured. These goals are not necessary to achieve the use-case
> > targed by FLIP-309. While it will be nice to support additional use-cases
> > with one proposal, it is probably also reasonable to make incremental
> > progress and support the low-hanging-fruit use-case first. The choice
> > really depends on the complexity and the importance of supporting the
> extra
> > use-cases.
> >
> > I am hoping that we can still be open to using the approach proposed in
> > FLIP-309 and we can not make the alternative approach work. What do you
> > think?
> >
> >
> > > > if the checkpointing overhead is
> > > > close to none, then it is beneficial to the e2e latency to still
> > > checkpoint
> > > > a high frequency even if there exists (intermittent) backpressure.
> > >
> > > In that case users could just configure a slow checkpointing interval
> to
> > a
> > > lower value, or just use static checkpoint interval strategy.
> > >
> >
> > I guess the point is that the suggested approach, which dynamically
> > determines the checkpointing interval based on the backpressure, may
> cause
> > regression when the checkpointing interval is relatively low. This makes
> it
> > hard for users to enable this feature in production. It is like an
> > auto-driving system that is not guaranteed to work
> >
> > On the other hand, the approach currently proposed in the FLIP is much
> > simpler as it does not depend on backpressure. Users specify the extra
> > interval requirement on the specific sources (e.g. HybridSource, MySQL
> CDC
> > Source) and can easily know the checkpointing interval will be used on
> the
> > continuous phase of the corresponding source. This is pretty much same as
> > how users use the existing execution.checkpointing.interval config. So
> > there is no extra concern of regression caused by this approach.
> >
> >
> > >
> > > > With the suggested approach, the e2e latency introduced by Flink is
> > > roughly
> > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to
> end,
> > > and
> > > > another 12 seconds for the accumulated backlog to be cleared. And
> Flink
> > > can
> > > > not do checkpoint before the backlog is cleared.
> > >
> > > Indeed that's a valid concern. After thinking more about this issue,
> > maybe
> > > the proper solution would be to calculate "how much overloaded is the
> > most
> > > overloaded subtask".
> > > In this case, that would be 10% (we are trying to push 110% of the
> > > available capacity in the current job/cluster). Then we could use that
> > > number as some kind of weighted average.
> > > We could figure out a function mapping the overload percentage, into a
> > > floating point number from range [0, 1]
> > >
> > > f(overload_factor) = weight // weight is from [0, 1]
> > >
> > > and then the desired checkpoint interval would be something like
> > >
> > > (1 - weight) * fastCheckpointInterval + weight * slowCheckpointInterval
> > >
> > > In your problematic example, we would like the weight to be pretty
> small
> > > (<10%?), so the calculated checkpoint interval would be pretty close to
> > the
> > > fastCheckpointInterval.
> > >
> >
> > Hmm... I am not sure it will always be pretty close to the
> > fastCheckpointInterval. We can discuss when there is concrete definition
> of
> > this algorithm.
> >
> > While each source subtask can measure its current throughput, I am not
> sure
> > it can measure the "input throughput", which is defined as the throughput
> > when the subtask (and it downstream operators) as the unlimited
> processing
> > capacity. Therefore, it seems pretty hard to determine the
> > "overload_factor" timely and accurately.
> >
> >
> > > The overload factor we could calculate the same way as FLIP-271 is
> > > calculating how much should we rescale given operator [4].
> > >
> > > I can think about this more and elaborate/refine this idea tomorrow.
> > >
> >
> > Sounds good. Looking forward to learning more ideas.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > Best,
> > > Piotrek
> > >
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > > [2]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > [3]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
> > > [4]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> > >
> > > wt., 30 maj 2023 o 13:58 Dong Lin <lindon...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thank you for providing those details.
> > > >
> > > > I understand you suggested using the existing "isBackPressured"
> signal
> > to
> > > > determine whether we should use the less frequent checkpointing
> > > interval. I
> > > > followed your thoughts and tried to make it work. Below are the
> issues
> > > that
> > > > I am not able to address. Can you see if there is a way to address
> > these
> > > > issues?
> > > >
> > > > Let's will use the following use-case to make the discussion more
> > > concrete:
> > > > a) Users want to checkpoint at least once every 30 minutes to
> > upper-bound
> > > > the amount of duplicate work after job failover.
> > > > b) Users want to checkpoint at least once every 30 seconds to
> > > > upper-bound *extra
> > > > e2e lag introduced by the Flink job* during the continuous processing
> > > > phase.
> > > >
> > > > The suggested approach is designed to do this:
> > > > - If any of the source subtasks is backpressured, the job will
> > checkpoint
> > > > at 30-minutes interval.
> > > > - If none of the source subtasks is backpressured, the job will
> > > checkpoint
> > > > at 30-seconds interval.
> > > >
> > > > And we would need to add the following public APIs to implement this
> > > > approach:
> > > > - Add a job level config, maybe
> > > > execution.checkpointing.interval.no-backpressure. This is the
> > > checkpointing
> > > > interval when none of the source subtasks is backpressured.
> > > > - Add a public API for source operator subtasks to report their
> > > > backpressure status to the checkpointing coordinator. The subtask
> > should
> > > > invoke this API whenever its backpressure status changed.
> > > >
> > > > Now, in order to make the suggested approach work for all users (i.e.
> > no
> > > > regression), we need to make sure that whenever we use the 30-minutes
> > > > checkpointing interval, the e2e latency will be less than or equal to
> > the
> > > > case where we use the 30-seconds checkpointing interval.
> > > >
> > > > I thought about this in detail, and found the following fabricated
> > > > scenarios where this approach might cause regression:
> > > >
> > > > During the continuous processing phase, the input throughput is 5MBps
> > > for 1
> > > > minute, and 11MBps for 1 minutes, in lock-steps. The maximum
> throughput
> > > > achievable by this job is 10Mbps. For simplicity, suppose the buffer
> > size
> > > > can hold roughly 1 second worth-of-data, then the job is
> > > > backpressured roughly 1 minutes out of every 2 minutes.
> > > >
> > > > With the suggested approach, the e2e latency introduced by Flink is
> > > roughly
> > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to
> end,
> > > and
> > > > another 12 seconds for the accumulated backlog to be cleared. And
> Flink
> > > can
> > > > not do checkpoint before the backlog is cleared.
> > > >
> > > > On the other hand, if we continue to checkpoint at 30-seconds
> interval,
> > > the
> > > > e2e latency introduced by Flink is at most 42 seconds, plus the extra
> > > delay
> > > > introduced by the checkpoint overhead. The e2e latency will be better
> > > than
> > > > the suggested approach, if the impact of the checkpoint is less than
> 30
> > > > seconds.
> > > >
> > > > I think the root cause of this issue is that the decision of the
> > > > checkpointing interval really depends on the expected impact of a
> > > > checkpoint on the throughput. For example, if the checkpointing
> > overhead
> > > is
> > > > close to none, then it is beneficial to the e2e latency to still
> > > checkpoint
> > > > a high frequency even if there exists (intermittent) backpressure.
> > > >
> > > > Here is another fabricated use-case where the suggested approach
> might
> > > > cause regression. Let's say user's job is
> > > > *hybridSource.keyBy(...).transform(operatorA).sinkTo(PaimonSink)*.
> The
> > > > parallelism is 2. As we can see, there is all-to-all edge between
> > source
> > > > and operatorA. And due to limited resources (e.g. buffer), at any
> given
> > > > time, each operatorA subtask can only process data from one of its
> > > upstream
> > > > subtask at a time, meaning that the other upstream subtask will be
> > > > backpressured. So there might always be at least one source subtask
> > that
> > > is
> > > > backpressured even though the job's throughput can catch up with the
> > > input
> > > > throughput. However, the suggested approach might end up always using
> > the
> > > > less frequent checkpointing interval in this case.
> > > >
> > > > Suppose we can find a way to address the above issues, another issue
> > with
> > > > the suggested approach is the extra communication overhead between
> the
> > > > source operator subtasks and the checkpointing coordinator. The
> source
> > > > subtask needs to send a message to checkpointing coordinator whenever
> > its
> > > > backpressure status changes. The more frequently we check (e.g. once
> > > every
> > > > 10 ms), the larger the overhead. And if we check not so frequently
> > (e.g.
> > > > once every second), we might be more vulnerable to random/occasional
> > > > backpressure. So there seems to be tradeoff between the reliability
> and
> > > the
> > > > cost of this approach.
> > > >
> > > > Thanks again for the suggestion. I am looking forward to your
> comments.
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > > On Tue, May 30, 2023 at 4:37 PM Piotr Nowojski <pnowoj...@apache.org
> >
> > > > wrote:
> > > >
> > > > > Hi again,
> > > > >
> > > > > Thanks Dong, yes I think your concerns are valid, and that's why I
> > have
> > > > > previously refined my idea to use one of the backpressure measuring
> > > > metrics
> > > > > that we already have.
> > > > > Either simply `isBackPressured == true` check [1], or
> > > > > `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That
> > would
> > > > > address your three first concerns:
> > > > >   - lack of event time
> > > > >   - event time unreliability
> > > > >   - lack of universal threshold value for `pendingRecords`
> > > > >
> > > > > In a bit more detail, we probably should check (using [1] or [2])
> > > either:
> > > > >   a) if any of the source subtasks is backpressured
> > > > >   b) if any of the subtasks is backpressured
> > > > >
> > > > > In most cases a == b. The only time when that's not true, if some
> > > > windowed
> > > > > operator in the middle of the job graph started triggering so many
> > > > results
> > > > > that it became backpressured,
> > > > > but the backpressure didn't last long enough to propagate to
> sources.
> > > For
> > > > > example that especially might occur if sources are idle. So
> probably
> > b)
> > > > is
> > > > > a better and more generic option.
> > > > >
> > > > > Regarding your last concern, with spiky traffic, I think the
> > following
> > > > > algorithm of triggering checkpoints would work pretty well:
> > > > >
> > > > > public BackpressureDetectingCheckpointTrigger {
> > > > >
> > > > > private long lastCheckpointTs = System.currentTimeMillis();
> > > > > private long slowCheckpointInterval = ...;
> > > > > private long fastCheckpointInteveral = ...;
> > > > >
> > > > > //code executed periodically, for example once a second, once every
> > > 10ms,
> > > > > or at the 1/10th of the fast checkpoint interval
> > > > > void maybeTriggerCheckpoint(...) {
> > > > >
> > > > >   long nextCheckpointTs = lastCheckpointTs;
> > > > >   if (isAnySubtaskBackpressured()) {
> > > > >     nextCheckpointTs += slowCheckpointInterval;
> > > > >   }
> > > > >   else {
> > > > >       nextCheckpointTs += fastCheckpointInterval;
> > > > >   }
> > > > >
> > > > >   if (nextCheckpointTs >= System.currentTimeMillis()) {
> > > > >     triggerCheckpoint();
> > > > >     lastCheckpointTs = System.currentTimeMillis();
> > > > >   }
> > > > > }
> > > > > }
> > > > >
> > > > > This way, if there is a spike of backpressure, it doesn't matter
> that
> > > > much.
> > > > > If the backpressure goes away until the next iteration, the next
> > check
> > > > will
> > > > > trigger a checkpoint according to the
> > > > > fast interval. The slow checkpoint interval will be used only if
> the
> > > > > backpressure persists for the whole duration of the
> > > > slowCheckpointInterval.
> > > > >
> > > > > We could also go a little bit more fancy, and instead of using only
> > > fast
> > > > or
> > > > > slow intervals, we could use a continuous spectrum to gradually
> > adjust
> > > > the
> > > > > interval, by replacing the first if/else
> > > > > check with a weighted average:
> > > > >
> > > > >   int maxBackPressureTime =
> > > getSubtaskMaxBackPressuredTimeMsPerSecond();
> > > > >   long nextCheckpointTs = lastCheckpointTs +
> slowCheckpointInterval *
> > > > > maxBackPressureTime + fastCheckpointInterval * (1000 -
> > > > > maxBackPressureTime);
> > > > >
> > > > > This would further eliminate some potential jitter and make the
> > actual
> > > > > checkpoint interval a bit more predictable.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > >
> > > > > wt., 30 maj 2023 o 04:40 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > > >
> > > > > > Let me correct the typo in the last paragraph as below:
> > > > > >
> > > > > > To make the problem even harder, the incoming traffic can be
> spiky.
> > > And
> > > > > the
> > > > > > overhead of triggering checkpointing can be relatively low, in
> > which
> > > > case
> > > > > > it might be more performant (w.r.t. e2e lag) for the Flink job to
> > > > > > checkpoint at the more frequent interval in the continuous phase
> in
> > > > face
> > > > > of
> > > > > > a spike in the number of pending records buffered in the source
> > > > operator.
> > > > > >
> > > > > >
> > > > > > On Tue, May 30, 2023 at 9:17 AM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Piotrek,
> > > > > > >
> > > > > > > Thanks for providing more details of the alternative approach!
> > > > > > >
> > > > > > > If I understand your proposal correctly, here are the
> > requirements
> > > > for
> > > > > it
> > > > > > > to work without incurring any regression:
> > > > > > >
> > > > > > > 1) The source needs a way to determine whether there exists
> > > > > backpressure.
> > > > > > > 2) If there is backpressure, then it means e2e latency is
> already
> > > > high
> > > > > > > and there should be no harm to use the less frequent
> > checkpointing
> > > > > > interval.
> > > > > > > 3) The configuration of the "less frequent checkpointing
> > interval"
> > > > > needs
> > > > > > > to be a job-level config so that it works for sources other
> than
> > > > > > > HybridSource.
> > > > > > >
> > > > > > > I would say that if we can find a way for the source to
> determine
> > > the
> > > > > > > "existence of backpressure" and meet the requirement 2), it
> would
> > > > > indeed
> > > > > > be
> > > > > > > a much more elegant approach that solves more use-cases.
> > > > > > >
> > > > > > > The devil is in the details. I am not sure how to determine the
> > > > > > "existence
> > > > > > > of backpressure". Let me explain my thoughts and maybe you can
> > help
> > > > > > > provide the answers.
> > > > > > >
> > > > > > > To make the discussion more concrete, let's say the input
> records
> > > do
> > > > > not
> > > > > > > have event timestamps. Users want to checkpoint at least once
> > every
> > > > 30
> > > > > > > minutes to upper-bound the amount of duplicate work after job
> > > > failover.
> > > > > > And
> > > > > > > users want to checkpoint at least once every 30 seconds to
> > > > upper-bound
> > > > > > *extra
> > > > > > > e2e lag introduced by the Flink job* during the continuous
> > > processing
> > > > > > > phase.
> > > > > > >
> > > > > > > Since the input records do not have event timestamps, we can
> not
> > > rely
> > > > > on
> > > > > > > metrics such as currentFetchEventTimeLag [1] to determine the
> > > > absolute
> > > > > > e2e
> > > > > > > lag, because currentFetchEventTimeLag depends on the existence
> of
> > > > event
> > > > > > > timestamps.
> > > > > > >
> > > > > > > Also note that, even if the input records have event timestamps
> > and
> > > > we
> > > > > > can
> > > > > > > measure currentFetchEventTimeLag, we still need a threshold to
> > > > > determine
> > > > > > > whether the value of currentFetchEventTimeLag is too high. One
> > idea
> > > > > might
> > > > > > > be to use the user-specified "less frequent checkpointing
> > interval"
> > > > as
> > > > > > > this threshold, which in this case is 30 seconds. But this
> > approach
> > > > can
> > > > > > > also cause regression. For example, let's say the records go
> > > through
> > > > > > > several Kafka/MirrorMaker pipelines after it is generated and
> > > before
> > > > it
> > > > > > is
> > > > > > > received by Flink, causing its currentFetchEventTimeLag to be
> > > always
> > > > > > higher
> > > > > > > than 30 seconds. Then Flink will end up always using the "less
> > > > frequent
> > > > > > > checkpointing interval" in the continuous phase, which in this
> > case
> > > > is
> > > > > 30
> > > > > > > minutes.
> > > > > > >
> > > > > > > Other options to determine the "existence of backpressure"
> > includes
> > > > > using
> > > > > > > the absolute number of records in the source storage system
> that
> > > are
> > > > > > > waiting to be fetched (e.g. pendingRecords [1]), or using the
> > > > absolute
> > > > > > > number of buffered records in the source output queue.
> However, I
> > > > find
> > > > > it
> > > > > > > hard to reliably determine "e2e latency is already high" based
> on
> > > the
> > > > > > > absolute number of records. What threshold should we choose to
> > > > > determine
> > > > > > > that the number of pending records is too many (and it is safe
> to
> > > > > > increase
> > > > > > > the checkpointing interval)?
> > > > > > >
> > > > > > > To make the problem even harder, the incoming traffic can be
> > spiky.
> > > > And
> > > > > > > the overhead of triggering checkpointing can be relative low,
> in
> > > > which
> > > > > > case
> > > > > > > it might be more performance (w.r.t. e2e lag) for the Flink job
> > to
> > > > > > > checkpoint at the higher interval in the continuous phase in
> face
> > > of
> > > > a
> > > > > > > spike in the number of pending records buffered in the source
> > > > operator.
> > > > > > >
> > > > > > > The problems described above are the main reasons that I can
> not
> > > > find a
> > > > > > > way to make the alternative approach work. Any thoughts?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > >
> > > > > > >
> > > > > > > On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski <
> > > > pnowoj...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi
> > > > > > >>
> > > > > > >> @Jing
> > > > > > >>
> > > > > > >> > Your proposal to dynamically adjust the checkpoint intervals
> > is
> > > > > > elegant!
> > > > > > >> It
> > > > > > >> > makes sense to build it as a generic feature in Flink.
> Looking
> > > > > forward
> > > > > > >> to
> > > > > > >> > it. However, for some user cases, e.g. when users were aware
> > of
> > > > the
> > > > > > >> bounded
> > > > > > >> > sources (in the HybridSource) and care more about the
> > > throughput,
> > > > > the
> > > > > > >> > dynamic adjustment might not be required. Just let those
> > bounded
> > > > > > sources
> > > > > > >> > always have larger checkpoint intervals even when there is
> no
> > > back
> > > > > > >> > pressure. Because no one cares about latency in this case,
> > let's
> > > > > turn
> > > > > > >> off
> > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency,
> have
> > > > better
> > > > > > >> > throughput, and save unnecessary source consumption. Did I
> > miss
> > > > > > anything
> > > > > > >> > here?
> > > > > > >>
> > > > > > >> But why do we need to have two separate mechanisms, if the
> > dynamic
> > > > > > >> adjustment based on the backpressure/backlog would
> > > > > > >> achieve basically the same goal as your proposal and would
> solve
> > > > both
> > > > > of
> > > > > > >> the problems? Having two independent solutions
> > > > > > >> in the same codebase, in the docs, that are achieving
> basically
> > > the
> > > > > same
> > > > > > >> thing is far from ideal. It would increase both the
> > > > > > >> complexity of the system and confuse potential users.
> > > > > > >>
> > > > > > >> Moreover, as I have already mentioned before, I don't like the
> > > > current
> > > > > > >> proposal as it's focusing ONLY on the HybridSource,
> > > > > > >> which can lead to even worse problem in the future, where many
> > > > > different
> > > > > > >> sources would have each a completely custom
> > > > > > >> solution to solve the same/similar problems, complicating the
> > > system
> > > > > and
> > > > > > >> confusing the users even more.
> > > > > > >>
> > > > > > >> @Dong,
> > > > > > >>
> > > > > > >> > For now I am not able to come up with a good way to support
> > > this.
> > > > I
> > > > > am
> > > > > > >> happy to discuss the
> > > > > > >> > pros/cons if you can provide more detail (e.g. API design)
> > > > regarding
> > > > > > how
> > > > > > >> to support this approach
> > > > > > >>
> > > > > > >> I have already described such proposal:
> > > > > > >>
> > > > > > >> > Piotr:
> > > > > > >> > I don't know, maybe instead of adding this logic to operator
> > > > > > >> coordinators, `CheckpointCoordinator` should have a pluggable
> > > > > > >> `CheckpointTrigger`,
> > > > > > >> > that the user could configure like a `MetricReporter`. The
> > > default
> > > > > one
> > > > > > >> would be just periodically triggering checkpoints. Maybe
> > > > > > >> > `BacklogDynamicCheckpointTrigger` could look at metrics[1],
> > > check
> > > > if
> > > > > > >> `pendingRecords` for some source has exceeded the configured
> > > > > > >> > threshold and based on that adjust the checkpointing
> interval
> > > > > > >> accordingly? This would at least address some of my concerns.
> > > > > > >>
> > > > > > >> plus
> > > > > > >>
> > > > > > >> > Piotr:
> > > > > > >> >  Either way, I would like to refine my earlier idea, and
> > instead
> > > > of
> > > > > > >> using
> > > > > > >> metrics like `pendingRecords`, I think we could switch between
> > > fast
> > > > > and
> > > > > > >> > slow checkpointing intervals based on the information if the
> > job
> > > > is
> > > > > > >> backpressured or not. My thinking is as follows:
> > > > > > >> >
> > > > > > >> > As a user, I would like to have my regular fast
> checkpointing
> > > > > interval
> > > > > > >> for low latency, but the moment my system is not keeping up,
> if
> > > the
> > > > > > >> backpressure
> > > > > > >> > builds up, or simply we have a huge backlog to reprocess,
> > > latency
> > > > > > >> doesn't
> > > > > > >> matter anymore. Only throughput matters. So I would like the
> > > > > > checkpointing
> > > > > > >> to slow down.
> > > > > > >> >
> > > > > > >> > I think this should cover pretty well most of the cases,
> what
> > do
> > > > you
> > > > > > >> think? If this backpressured based behaviour is still not
> > enough,
> > > I
> > > > > > would
> > > > > > >> still say
> > > > > > >> > that we should provide plugable checkpoint triggering
> > > controllers
> > > > > that
> > > > > > >> would work based on metrics.
> > > > > > >>
> > > > > > >> > change the checkpointing interval based on the "backlog
> > signal",
> > > > > > >>
> > > > > > >> What's wrong with the job being backpressured? If job is
> > > > > backpressured,
> > > > > > we
> > > > > > >> don't care about individual records latency, only about
> > increasing
> > > > > > >> the throughput to get out of the backpressure situation ASAP.
> > > > > > >>
> > > > > > >> > In the mentioned use-case, users want to have two different
> > > > > > >> checkpointing
> > > > > > >> > intervals at different phases of the HybridSource. We should
> > > > provide
> > > > > > an
> > > > > > >> API
> > > > > > >> > for users to express the extra checkpointing interval in
> > > addition
> > > > to
> > > > > > the
> > > > > > >> > existing execution.checkpointing.interval. What would be the
> > > > > > definition
> > > > > > >> of
> > > > > > >> > that API with this alternative approach?
> > > > > > >>
> > > > > > >> I think my proposal with `BacklogDynamicCheckpointTrigger` or
> > > > > > >> `BackpressureDetectingCheckpointTrigger` would solve your
> > > motivating
> > > > > use
> > > > > > >> case
> > > > > > >> just as well.
> > > > > > >>
> > > > > > >> 1. In the catch up phase (reading the bounded source):
> > > > > > >>   a) if we are under backpressure (common case), system would
> > > > fallback
> > > > > > to
> > > > > > >> the less frequent checkpointing interval
> > > > > > >>   b) if there is no backpressure (I hope a rare case, there
> is a
> > > > > > backlog,
> > > > > > >> but the source is too slow), Flink cluster has spare resources
> > to
> > > > > > actually
> > > > > > >> run more
> > > > > > >>       frequent checkpointing interval. No harm should be done.
> > But
> > > > > > >> arguably
> > > > > > >> using a less frequent checkpointing interval here should be
> more
> > > > > > >> desirable.
> > > > > > >>
> > > > > > >> 2. In the continuous processing phase (unbounded source)
> > > > > > >>   a) if we are under backpressure, as I mentioned above, no
> one
> > > > cares
> > > > > > >> about
> > > > > > >> checkpointing interval and the frequency of committing records
> > to
> > > > the
> > > > > > >>       output, as e2e latency is already high due to the
> backlog
> > in
> > > > the
> > > > > > >> sources
> > > > > > >>   b) if there is no backpressure, that's the only case where
> the
> > > > user
> > > > > > >> actually cares about the frequency of committing records to
> the
> > > > > output,
> > > > > > we
> > > > > > >> are
> > > > > > >>       using the more frequent checkpointing interval.
> > > > > > >>
> > > > > > >> 1b) I think is mostly harmless, and I think could be solved
> with
> > > > some
> > > > > > >> extra
> > > > > > >> effort
> > > > > > >> 2a) and 2b) are not solved by your proposal
> > > > > > >> 2a) and 2b) are applicable to any source, not just
> HybridSource,
> > > > which
> > > > > > is
> > > > > > >> also not covered by your proposal.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Piotrek
> > > > > > >>
> > > > > > >>
> > > > > > >> czw., 25 maj 2023 o 17:29 Jing Ge <j...@ververica.com.invalid
> >
> > > > > > >> napisał(a):
> > > > > > >>
> > > > > > >> > Hi Dong, Hi Piotr,
> > > > > > >> >
> > > > > > >> > Thanks for the clarification.
> > > > > > >> >
> > > > > > >> > @Dong
> > > > > > >> >
> > > > > > >> > According to the code examples in the FLIP, I thought we are
> > > > > focusing
> > > > > > on
> > > > > > >> > the HybridSource scenario. With the current HybridSource
> > > > > > >> implementation, we
> > > > > > >> > don't even need to know the boundedness of sources in the
> > > > > > HybridSource,
> > > > > > >> > since all sources except the last one must be bounded[1],
> i.e.
> > > > only
> > > > > > the
> > > > > > >> > last source is unbounded. This makes it much easier to set
> > > > different
> > > > > > >> > intervals to sources with different boundedness.
> > > > > > >> >
> > > > > > >> > Boundedness in Flink is a top level concept. I think it
> should
> > > be
> > > > ok
> > > > > > to
> > > > > > >> > introduce a top level config for the top level concept. I am
> > not
> > > > > > >> familiar
> > > > > > >> > with MySQL CDC. For those specific cases, you are right,
> your
> > > > > proposal
> > > > > > >> can
> > > > > > >> > provide the feature with minimal changes, like I mentioned
> > > > > previously,
> > > > > > >> it
> > > > > > >> > is a thoughtful design.  +1
> > > > > > >> >
> > > > > > >> > @Piotr
> > > > > > >> >
> > > > > > >> > > For example join (windowed/temporal) of two tables backed
> > by a
> > > > > > hybrid
> > > > > > >> > > source? I could easily see a scenario where one table with
> > > > little
> > > > > > data
> > > > > > >> > > catches up much more quickly.
> > > > > > >> >
> > > > > > >> > I am confused. I thought we were talking about HybridSource
> > > which
> > > > > > >> "solves
> > > > > > >> > the problem of sequentially reading input from heterogeneous
> > > > sources
> > > > > > to
> > > > > > >> > produce a single input stream."[2]
> > > > > > >> > I could not find any join within a HybridSource. So, your
> > might
> > > > mean
> > > > > > >> > something else the join example and it should be out of the
> > > scope,
> > > > > if
> > > > > > I
> > > > > > >> am
> > > > > > >> > not mistaken.
> > > > > > >> >
> > > > > > >> > > About the (un)boundness of the input stream. I'm not sure
> if
> > > > that
> > > > > > >> should
> > > > > > >> > > actually matter. Actually the same issue, with two
> frequent
> > > > > > >> checkpointing
> > > > > > >> > > during a catch up period or when Flink is overloaded,
> could
> > > > affect
> > > > > > >> jobs
> > > > > > >> > > that are purely unbounded, like continuously reading from
> > > Kafka.
> > > > > > Even
> > > > > > >> > more,
> > > > > > >> > > nothing prevents users from actually storing bounded data
> > in a
> > > > > Kafka
> > > > > > >> > topic.
> > > > > > >> > > Either way, I would like to refine my earlier idea, and
> > > instead
> > > > of
> > > > > > >> using
> > > > > > >> > > metrics like `pendingRecords`, I think we could switch
> > between
> > > > > fast
> > > > > > >> and
> > > > > > >> > > slow checkpointing intervals based on the information if
> the
> > > job
> > > > > is
> > > > > > >> > > backpressured or not. My thinking is as follows:
> > > > > > >> >
> > > > > > >> > This is again a very different use case as HybridSource.
> Users
> > > do
> > > > > > allow
> > > > > > >> > storing bounded data in a Kafka and if it is not used as the
> > > last
> > > > > > >> source in
> > > > > > >> > a HybridSource, it is a bounded source and can still benefit
> > > from
> > > > > > larger
> > > > > > >> > checkpoint interval wrt the high throughput (Kafka or any
> > other
> > > > > > storage
> > > > > > >> > does not matter). BTW, the larger checkpoint interval for
> > > bounded
> > > > > > >> source is
> > > > > > >> > optional, users can use it but must not use it, if they
> don't
> > > care
> > > > > > about
> > > > > > >> > the throughput with bounded data.
> > > > > > >> >
> > > > > > >> > Your proposal to dynamically adjust the checkpoint intervals
> > is
> > > > > > >> elegant! It
> > > > > > >> > makes sense to build it as a generic feature in Flink.
> Looking
> > > > > forward
> > > > > > >> to
> > > > > > >> > it. However, for some user cases, e.g. when users were aware
> > of
> > > > the
> > > > > > >> bounded
> > > > > > >> > sources (in the HybridSource) and care more about the
> > > throughput,
> > > > > the
> > > > > > >> > dynamic adjustment might not be required. Just let those
> > bounded
> > > > > > sources
> > > > > > >> > always have larger checkpoint intervals even when there is
> no
> > > back
> > > > > > >> > pressure. Because no one cares about latency in this case,
> > let's
> > > > > turn
> > > > > > >> off
> > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency,
> have
> > > > better
> > > > > > >> > throughput, and save unnecessary source consumption. Did I
> > miss
> > > > > > anything
> > > > > > >> > here?
> > > > > > >> >
> > > > > > >> > Best regards,
> > > > > > >> > Jing
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > [1]
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244
> > > > > > >> > [2]
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, May 25, 2023 at 3:03 PM Dong Lin <
> lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Piotr,
> > > > > > >> > >
> > > > > > >> > > Thanks for the discussion. Please see my comments inline.
> > > > > > >> > >
> > > > > > >> > > On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski <
> > > > > > pnowoj...@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi all,
> > > > > > >> > > >
> > > > > > >> > > > Thanks for the discussion.
> > > > > > >> > > >
> > > > > > >> > > > @Dong
> > > > > > >> > > >
> > > > > > >> > > > > In the target use-case, we would like to HybridSource
> to
> > > > > > trigger>
> > > > > > >> > > > checkpoint more frequently when it is read the Kafka
> > Source
> > > > > (than
> > > > > > >> when
> > > > > > >> > it
> > > > > > >> > > > > is reading the HDFS source). We would need to set a
> flag
> > > for
> > > > > the
> > > > > > >> > > > checkpoint
> > > > > > >> > > > > trigger to know which source the HybridSource is
> reading
> > > > from.
> > > > > > >> > > >
> > > > > > >> > > > Is this really your actual goal? Should users care if
> some
> > > > table
> > > > > > >> > defined
> > > > > > >> > > in
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > My actual goal is to address the use-case described in the
> > > > > > motivation
> > > > > > >> > > section. More specifically,
> > > > > > >> > > my goal is to provide API that uses can use to express
> their
> > > > > needed
> > > > > > >> > > checkpointing interval
> > > > > > >> > > at different phases of the job. So that Flink can achieve
> > the
> > > > > > maximum
> > > > > > >> > > throughput while also meeting
> > > > > > >> > > users' need for data freshness and failover time.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > > a meta store is backed by HybridSource or not? I think
> the
> > > > > actual
> > > > > > >> goal
> > > > > > >> > is
> > > > > > >> > > > this:
> > > > > > >> > >
> > > > > > >> > > As a user I would like to have a self adjusting mechanism
> > for
> > > > > > >> > checkpointing
> > > > > > >> > > > intervals, so that during the catch up phase my job
> > focuses
> > > on
> > > > > > >> > throughput
> > > > > > >> > > > to catch up ASAP, while during normal processing
> (without
> > a
> > > > > large
> > > > > > >> > > backlog)
> > > > > > >> > > > Flink is trying to minimize e2e latency.
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > Sure. It will be great to have a way to support this
> > > > > self-adjusting
> > > > > > >> > > mechanism. For now I am not able
> > > > > > >> > > to come up with a good way to support this. I am happy to
> > > > discuss
> > > > > > the
> > > > > > >> > > pros/cons if you can provide
> > > > > > >> > > more detail (e.g. API design) regarding how to support
> this
> > > > > > approach.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > > Am I right here?
> > > > > > >> > > >
> > > > > > >> > > > > there won't exist any "*conflicting* desired
> checkpoint
> > > > > trigger"
> > > > > > >> by
> > > > > > >> > > > definition
> > > > > > >> > > >
> > > > > > >> > > > Ok, arguably there won't be a conflict, but the decision
> > to
> > > > pick
> > > > > > >> > minimum
> > > > > > >> > > > out of the upper bounds might be sub-optimal.
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > As of today, users need checkpoint in order to address two
> > > > goals.
> > > > > > One
> > > > > > >> > goal
> > > > > > >> > > is to upper-bound
> > > > > > >> > > data staleness when there is sink with exactly-once
> > semantics
> > > > > (e.g.
> > > > > > >> > > Paimon), since those sinks
> > > > > > >> > > can only output data when checkpoint is triggered. The
> other
> > > > goal
> > > > > is
> > > > > > >> to
> > > > > > >> > > upper-bound the amount of
> > > > > > >> > > duplicate work needed after failover.
> > > > > > >> > >
> > > > > > >> > > In both cases, users need to upper-bound the checkpointing
> > > > > interval.
> > > > > > >> This
> > > > > > >> > > makes it more intuitive
> > > > > > >> > > for the config to only express the checkpointing interval
> > > > > > upper-bound.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > > > Overall, I am not sure we always want to have a longer
> > > > > > >> checkpointing
> > > > > > >> > > > > interval. That really depends on the specific use-case
> > and
> > > > the
> > > > > > job
> > > > > > >> > > graph.
> > > > > > >> > > >
> > > > > > >> > > > Yes, that's why I proposed something a little bit more
> > > > generic.
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > I am not sure I fully understand the alternative proposal
> > that
> > > > is
> > > > > > >> meant
> > > > > > >> > to
> > > > > > >> > > be more generic. So it is hard for me to evaluate the
> > > pros/cons.
> > > > > > >> > >
> > > > > > >> > > I understand that you preferred for the source operator to
> > use
> > > > the
> > > > > > >> REST
> > > > > > >> > API
> > > > > > >> > > to trigger checkpoints. This sounds
> > > > > > >> > > like a downside since using REST API is not as easy as
> using
> > > the
> > > > > > >> > > programming API proposed in the FLIP.
> > > > > > >> > >
> > > > > > >> > > Can you help explain the generic approach more concretely,
> > > such
> > > > as
> > > > > > the
> > > > > > >> > APIs
> > > > > > >> > > you would suggest introducing? That would
> > > > > > >> > > allow me to evaluate the pros/cons and hopefully pick the
> > best
> > > > > > option.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > > > I believe there can be use-case where
> > > > > > >> > > > > the proposed API is not useful, in which case users
> can
> > > > choose
> > > > > > >> not to
> > > > > > >> > > use
> > > > > > >> > > > > the API without incurring any performance regression.
> > > > > > >> > > >
> > > > > > >> > > > I'm not saying that this proposal is not useful. Just
> that
> > > we
> > > > > > might
> > > > > > >> be
> > > > > > >> > > able
> > > > > > >> > > > to solve this problem in a more flexible manner. If we
> > > > > introduce a
> > > > > > >> > > > partially working solution now at the source level, and
> > > later
> > > > we
> > > > > > >> will
> > > > > > >> > > still
> > > > > > >> > > > need a different solution on another level to cover
> other
> > > use
> > > > > > cases,
> > > > > > >> > that
> > > > > > >> > > > would clog the API and confuse users.
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > Can you explain why this is "partially working"? Is it
> > because
> > > > > there
> > > > > > >> are
> > > > > > >> > > use-cases that should
> > > > > > >> > > be addressed but not already covered by the proposed
> > approach?
> > > > > > >> > >
> > > > > > >> > > If so, can you help explain the use-case that would be
> > useful
> > > to
> > > > > > >> address?
> > > > > > >> > > With concrete
> > > > > > >> > > use-cases in mind, we can pick the API with minimal change
> > to
> > > > > > address
> > > > > > >> > these
> > > > > > >> > > use-cases.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > > @Jing
> > > > > > >> > > >
> > > > > > >> > > > > @Piotr
> > > > > > >> > > > > Just out of curiosity, do you know any real use cases
> > > where
> > > > > > >> real-time
> > > > > > >> > > > data is processed before the backlog?
> > > > > > >> > > >
> > > > > > >> > > > For example join (windowed/temporal) of two tables
> backed
> > > by a
> > > > > > >> hybrid
> > > > > > >> > > > source? I could easily see a scenario where one table
> with
> > > > > little
> > > > > > >> data
> > > > > > >> > > > catches up much more quickly.
> > > > > > >> > > >
> > > > > > >> > > > @Jing and @Dong
> > > > > > >> > > >
> > > > > > >> > > > About the (un)boundness of the input stream. I'm not
> sure
> > if
> > > > > that
> > > > > > >> > should
> > > > > > >> > > > actually matter. Actually the same issue, with two
> > frequent
> > > > > > >> > checkpointing
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > Indeed, I agree with you on this point and prefer not to
> > have
> > > > this
> > > > > > >> > proposal
> > > > > > >> > > depend on the (un)boundness.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > > during a catch up period or when Flink is overloaded,
> > could
> > > > > affect
> > > > > > >> jobs
> > > > > > >> > > > that are purely unbounded, like continuously reading
> from
> > > > Kafka.
> > > > > > >> Even
> > > > > > >> > > more,
> > > > > > >> > > > nothing prevents users from actually storing bounded
> data
> > > in a
> > > > > > Kafka
> > > > > > >> > > topic.
> > > > > > >> > > > Either way, I would like to refine my earlier idea, and
> > > > instead
> > > > > of
> > > > > > >> > using
> > > > > > >> > > > metrics like `pendingRecords`, I think we could switch
> > > between
> > > > > > fast
> > > > > > >> and
> > > > > > >> > > > slow checkpointing intervals based on the information if
> > the
> > > > job
> > > > > > is
> > > > > > >> > > > backpressured or not. My thinking is as follows:
> > > > > > >> > > >
> > > > > > >> > > > As a user, I would like to have my regular fast
> > > checkpointing
> > > > > > >> interval
> > > > > > >> > > for
> > > > > > >> > > > low latency, but the moment my system is not keeping up,
> > if
> > > > the
> > > > > > >> > > > backpressure builds up, or simply we have a huge backlog
> > to
> > > > > > >> reprocess,
> > > > > > >> > > > latency doesn't matter anymore. Only throughput matters.
> > So
> > > I
> > > > > > would
> > > > > > >> > like
> > > > > > >> > > > the checkpointing to slow down.
> > > > > > >> > > >
> > > > > > >> > > > I think this should cover pretty well most of the cases,
> > > what
> > > > do
> > > > > > you
> > > > > > >> > > think?
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > > Thank you for all the comments and this idea. I like this
> > > idea.
> > > > We
> > > > > > >> > actually
> > > > > > >> > > thought about this idea before proposing this FLIP.
> > > > > > >> > >
> > > > > > >> > > In order to make this idea work, we need to come-up with a
> > > good
> > > > > > >> algorithm
> > > > > > >> > > that can dynamically change the checkpointing interval
> based
> > > on
> > > > > the
> > > > > > >> > > "backlog signal", without causing regression w.r.t.
> failover
> > > > time
> > > > > > and
> > > > > > >> > data
> > > > > > >> > > freshness. I find it hard to come up with this algorithm
> due
> > > to
> > > > > > >> > > insufficient "backlog signal".
> > > > > > >> > >
> > > > > > >> > > For the use-case mentioned in the motivation section, the
> > data
> > > > in
> > > > > > the
> > > > > > >> > > source does not have event timestamps to help determine
> the
> > > > amount
> > > > > > of
> > > > > > >> > > backlog. So the only source-of-truth for determining
> backlog
> > > is
> > > > > the
> > > > > > >> > amount
> > > > > > >> > > of data buffered in operators. But the buffer size is
> > > typically
> > > > > > >> chosen to
> > > > > > >> > > be proportional to round-trip-time and throughput. Having
> a
> > > full
> > > > > > >> buffer
> > > > > > >> > > does not necessarily mean that the data is lagging behind.
> > And
> > > > > > >> increasing
> > > > > > >> > > the checkpointing interval with insufficient "backlog
> > signal"
> > > > can
> > > > > > >> have a
> > > > > > >> > > negative impact on data freshness and failover time.
> > > > > > >> > >
> > > > > > >> > > In order to make this idea work, we would need to
> *provide*
> > > that
> > > > > the
> > > > > > >> > > algorithm would not negatively hurt data freshness and
> > > failover
> > > > > time
> > > > > > >> when
> > > > > > >> > > it decides to increase checkpointing intervals. For now I
> > cold
> > > > not
> > > > > > >> come
> > > > > > >> > up
> > > > > > >> > > with such an algorithm.
> > > > > > >> > >
> > > > > > >> > > If this backpressured based behaviour is still not
> enough, I
> > > > would
> > > > > > >> still
> > > > > > >> > > > say that we should provide plugable checkpoint
> triggering
> > > > > > >> controllers
> > > > > > >> > > that
> > > > > > >> > > > would work based on metrics.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > I am not sure how to address the use-case mentioned in the
> > > > > > motivation
> > > > > > >> > > section, with the pluggable checkpoint trigger + metrics.
> > Can
> > > > you
> > > > > > help
> > > > > > >> > > provide the definition of these APIs and kindly explain
> how
> > > that
> > > > > > >> works to
> > > > > > >> > > address the mentioned use-case.
> > > > > > >> > >
> > > > > > >> > > In the mentioned use-case, users want to have two
> different
> > > > > > >> checkpointing
> > > > > > >> > > intervals at different phases of the HybridSource. We
> should
> > > > > provide
> > > > > > >> an
> > > > > > >> > API
> > > > > > >> > > for users to express the extra checkpointing interval in
> > > > addition
> > > > > to
> > > > > > >> the
> > > > > > >> > > existing execution.checkpointing.interval. What would be
> the
> > > > > > >> definition
> > > > > > >> > of
> > > > > > >> > > that API with this alternative approach?
> > > > > > >> > >
> > > > > > >> > > Best,
> > > > > > >> > > Dong
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > > Best,
> > > > > > >> > > > Piotrek
> > > > > > >> > > >
> > > > > > >> > > > czw., 25 maj 2023 o 07:47 Dong Lin <lindon...@gmail.com
> >
> > > > > > >> napisał(a):
> > > > > > >> > > >
> > > > > > >> > > > > Hi Jing,
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks for your comments!
> > > > > > >> > > > >
> > > > > > >> > > > > Regarding the idea of using the existing "boundedness"
> > > > > attribute
> > > > > > >> of
> > > > > > >> > > > > sources, that is indeed something that we might find
> > > > intuitive
> > > > > > >> > > > initially. I
> > > > > > >> > > > > have thought about this idea, but could not find a
> good
> > > way
> > > > to
> > > > > > >> make
> > > > > > >> > it
> > > > > > >> > > > > work. I will try to explain my thoughts and see if we
> > can
> > > > > find a
> > > > > > >> > better
> > > > > > >> > > > > solution.
> > > > > > >> > > > >
> > > > > > >> > > > > Here is my understanding of the idea mentioned above:
> > > > provide
> > > > > a
> > > > > > >> job
> > > > > > >> > > level
> > > > > > >> > > > > config execution.checkpoint.interval.bounded. Flink
> will
> > > use
> > > > > > this
> > > > > > >> as
> > > > > > >> > > the
> > > > > > >> > > > > checkpointing interval whenever there exists at least
> > one
> > > > > > running
> > > > > > >> > > source
> > > > > > >> > > > > which claims it is under the "bounded" stage.
> > > > > > >> > > > >
> > > > > > >> > > > > Note that we can not simply re-use the existing
> > > > "boundedness"
> > > > > > >> > attribute
> > > > > > >> > > > of
> > > > > > >> > > > > source operators. The reason is that for sources such
> as
> > > > MySQL
> > > > > > >> CDC,
> > > > > > >> > its
> > > > > > >> > > > > boundedness can be "continuous_unbounded" because it
> can
> > > run
> > > > > > >> > > > continuously.
> > > > > > >> > > > > But MySQL CDC has two phases internally, where the
> > source
> > > > > needs
> > > > > > to
> > > > > > >> > > first
> > > > > > >> > > > > read a snapshot (with bounded amount of data) and then
> > > read
> > > > a
> > > > > > >> binlog
> > > > > > >> > > > (with
> > > > > > >> > > > > unbounded amount of data).
> > > > > > >> > > > >
> > > > > > >> > > > > As a result, in order to support optimization for
> souces
> > > > like
> > > > > > >> MySQL
> > > > > > >> > > CDC,
> > > > > > >> > > > we
> > > > > > >> > > > > need to expose an API for the source operator to
> declare
> > > > > whether
> > > > > > >> it
> > > > > > >> > is
> > > > > > >> > > > > running at a bounded or continuous_unbounded stage.
> > *This
> > > > > > >> introduces
> > > > > > >> > > the
> > > > > > >> > > > > need to define a new concept named "bounded stage".*
> > > > > > >> > > > >
> > > > > > >> > > > > Then, we will need to *introduce a new contract
> between
> > > > source
> > > > > > >> > > operators
> > > > > > >> > > > > and the Flink runtime*, saying that if there is a
> source
> > > > that
> > > > > > >> claims
> > > > > > >> > it
> > > > > > >> > > > is
> > > > > > >> > > > > running at the bounded stage, then Flink will use the
> "
> > > > > > >> > > > > execution.checkpoint.interval.bounded" as the
> > > checkpointing
> > > > > > >> interval.
> > > > > > >> > > > >
> > > > > > >> > > > > Here are the the concerns I have with this approach:
> > > > > > >> > > > >
> > > > > > >> > > > > - The execution.checkpoint.interval.bounded is a
> > top-level
> > > > > > config,
> > > > > > >> > > > meaning
> > > > > > >> > > > > that every Flink user needs to read about its
> semantics.
> > > In
> > > > > > >> > comparison,
> > > > > > >> > > > the
> > > > > > >> > > > > proposed approach only requires users of specific
> > sources
> > > > > (e.g.
> > > > > > >> > > > > HybridSource, MySQL CDC) to know the new
> source-specific
> > > > > config.
> > > > > > >> > > > >
> > > > > > >> > > > > - It introduces a new top-level concept in Flink to
> > > describe
> > > > > the
> > > > > > >> > > internal
> > > > > > >> > > > > stages of specific sources (e.g. MySQL CDC). In
> > > comparison,
> > > > > the
> > > > > > >> > > proposed
> > > > > > >> > > > > approach only requires users of specific sources (e.g.
> > > > > > >> HybridSource,
> > > > > > >> > > > MySQL
> > > > > > >> > > > > CDC) to know this concept, which not only makes the
> > > > > explanation
> > > > > > >> much
> > > > > > >> > > > > simpler (since they are already using the specific
> > > sources),
> > > > > but
> > > > > > >> also
> > > > > > >> > > > > limits the scope of this new concept (only these users
> > > need
> > > > to
> > > > > > >> know
> > > > > > >> > > this
> > > > > > >> > > > > concept).
> > > > > > >> > > > >
> > > > > > >> > > > > - It is harder to understand the existing config
> > > > > > >> > > > > execution.checkpoint.interval.
> > > > > > >> > > > > Because we need to explain that it is only used when
> > there
> > > > is
> > > > > no
> > > > > > >> > source
> > > > > > >> > > > > with "bounded stage", introducing more if-else for
> this
> > > > > config.
> > > > > > In
> > > > > > >> > > > > comparison, with the proposed approach, the semantics
> of
> > > > > > >> > > > > execution.checkpoint.interval is simpler without
> > if/else,
> > > as
> > > > > it
> > > > > > >> will
> > > > > > >> > > > always
> > > > > > >> > > > > be applied regardless which sources users are using.
> > > > > > >> > > > >
> > > > > > >> > > > > I am happy to discuss if there are better approaches.
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks,
> > > > > > >> > > > > Dong
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, May 24, 2023 at 8:23 AM Jing Ge
> > > > > > >> <j...@ververica.com.invalid>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hi Yunfeng, Hi Dong
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks for the informative discussion! It is a
> > rational
> > > > > > >> requirement
> > > > > > >> > > to
> > > > > > >> > > > > set
> > > > > > >> > > > > > different checkpoint intervals for different sources
> > in
> > > a
> > > > > > >> > > hybridSource.
> > > > > > >> > > > > The
> > > > > > >> > > > > > tiny downside of this proposal, at least for me, is
> > > that I
> > > > > > have
> > > > > > >> to
> > > > > > >> > > > > > understand the upper-bound definition of the
> interval
> > > and
> > > > > the
> > > > > > >> > > built-in
> > > > > > >> > > > > rule
> > > > > > >> > > > > > for Flink to choose the minimum value between it and
> > the
> > > > > > default
> > > > > > >> > > > interval
> > > > > > >> > > > > > setting. However, afaiac, the intention of this
> > built-in
> > > > > rule
> > > > > > >> is to
> > > > > > >> > > > > > minimize changes in Flink to support the request
> > feature
> > > > > which
> > > > > > >> is a
> > > > > > >> > > > very
> > > > > > >> > > > > > thoughtful move. Thanks for taking care of it. +1
> for
> > > the
> > > > > > >> Proposal.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Another very rough idea was rising in my mind while
> I
> > > was
> > > > > > >> reading
> > > > > > >> > the
> > > > > > >> > > > > FLIP.
> > > > > > >> > > > > > I didn't do a deep dive with related source code
> yet,
> > so
> > > > > > please
> > > > > > >> > > correct
> > > > > > >> > > > > me
> > > > > > >> > > > > > if I am wrong. The use case shows that two different
> > > > > > checkpoint
> > > > > > >> > > > intervals
> > > > > > >> > > > > > should be set for bounded(historical) stream and
> > > > > > unbounded(fresh
> > > > > > >> > > > > real-time)
> > > > > > >> > > > > > stream sources. It is a trade-off between throughput
> > and
> > > > > > >> latency,
> > > > > > >> > > i.e.
> > > > > > >> > > > > > bounded stream with large checkpoint interval for
> > better
> > > > > > >> throughput
> > > > > > >> > > and
> > > > > > >> > > > > > unbounded stream with small checkpoint interval for
> > > lower
> > > > > > >> latency
> > > > > > >> > (in
> > > > > > >> > > > > case
> > > > > > >> > > > > > of failover). As we could see that the different
> > > interval
> > > > > > >> setting
> > > > > > >> > > > depends
> > > > > > >> > > > > > on the boundedness of streams. Since the Source API
> > > > already
> > > > > > has
> > > > > > >> its
> > > > > > >> > > own
> > > > > > >> > > > > > boundedness flag[1], is it possible to define two
> > > interval
> > > > > > >> > > > configurations
> > > > > > >> > > > > > and let Flink automatically set the related one to
> the
> > > > > source
> > > > > > >> based
> > > > > > >> > > on
> > > > > > >> > > > > the
> > > > > > >> > > > > > known boundedness? The interval for bounded stream
> > could
> > > > be
> > > > > > like
> > > > > > >> > > > > > execution.checkpoint.interval.bounded(naming could
> be
> > > > > > >> > reconsidered),
> > > > > > >> > > > and
> > > > > > >> > > > > > the other one for unbounded stream, we could use the
> > > > > existing
> > > > > > >> one
> > > > > > >> > > > > > execution.checkpoint.interval by default, or
> > introduce a
> > > > new
> > > > > > one
> > > > > > >> > like
> > > > > > >> > > > > > execution.checkpoint.interval.unbounded. In this
> way,
> > no
> > > > API
> > > > > > >> change
> > > > > > >> > > is
> > > > > > >> > > > > > required.
> > > > > > >> > > > > >
> > > > > > >> > > > > > @Piotr
> > > > > > >> > > > > > Just out of curiosity, do you know any real use
> cases
> > > > where
> > > > > > >> > real-time
> > > > > > >> > > > > data
> > > > > > >> > > > > > is processed before the backlog? Semantically, the
> > > backlog
> > > > > > >> contains
> > > > > > >> > > > > > historical data that has to be processed before the
> > > > > real-time
> > > > > > >> data
> > > > > > >> > is
> > > > > > >> > > > > > allowed to be processed. Otherwise, up-to-date data
> > will
> > > > be
> > > > > > >> > > overwritten
> > > > > > >> > > > > by
> > > > > > >> > > > > > out-of-date data which turns out to be unexpected
> > > results
> > > > in
> > > > > > >> real
> > > > > > >> > > > > business
> > > > > > >> > > > > > scenarios.
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > Best regards,
> > > > > > >> > > > > > Jing
> > > > > > >> > > > > >
> > > > > > >> > > > > > [1]
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Tue, May 23, 2023 at 5:53 PM Dong Lin <
> > > > > lindon...@gmail.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hi Piotr,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for the comments. Let me try to understand
> > your
> > > > > > >> concerns
> > > > > > >> > and
> > > > > > >> > > > > > > hopefully address the concerns.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > >> What would happen if there are two (or more)
> > > operator
> > > > > > >> > > coordinators
> > > > > > >> > > > > > with
> > > > > > >> > > > > > > conflicting desired checkpoint trigger behaviour
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > With the proposed change, there won't exist any
> > > > > > "*conflicting*
> > > > > > >> > > > desired
> > > > > > >> > > > > > > checkpoint trigger" by definition. Both job-level
> > > config
> > > > > and
> > > > > > >> the
> > > > > > >> > > > > proposed
> > > > > > >> > > > > > > API upperBoundCheckpointingInterval() means the
> > > > > upper-bound
> > > > > > of
> > > > > > >> > the
> > > > > > >> > > > > > > checkpointing interval. If there are different
> > > > > upper-bounds
> > > > > > >> > > proposed
> > > > > > >> > > > by
> > > > > > >> > > > > > > different source operators and the job-level
> config,
> > > > Flink
> > > > > > >> will
> > > > > > >> > try
> > > > > > >> > > > to
> > > > > > >> > > > > > > periodically trigger checkpoints at the interval
> > > > > > >> corresponding to
> > > > > > >> > > the
> > > > > > >> > > > > > > minimum of all these proposed upper-bounds.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > >> If one source is processing a backlog and the
> > other
> > > > is
> > > > > > >> already
> > > > > > >> > > > > > > processing real time data..
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Overall, I am not sure we always want to have a
> > longer
> > > > > > >> > > checkpointing
> > > > > > >> > > > > > > interval. That really depends on the specific
> > use-case
> > > > and
> > > > > > the
> > > > > > >> > job
> > > > > > >> > > > > graph.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > The proposed API change mechanism for operators
> and
> > > > users
> > > > > to
> > > > > > >> > > specify
> > > > > > >> > > > > > > different checkpoint intervals at different
> periods
> > of
> > > > the
> > > > > > >> job.
> > > > > > >> > > Users
> > > > > > >> > > > > > have
> > > > > > >> > > > > > > the option to use the new API to get better
> > > performance
> > > > in
> > > > > > the
> > > > > > >> > > > use-case
> > > > > > >> > > > > > > specified in the motivation section. I believe
> there
> > > can
> > > > > be
> > > > > > >> > > use-case
> > > > > > >> > > > > > where
> > > > > > >> > > > > > > the proposed API is not useful, in which case
> users
> > > can
> > > > > > choose
> > > > > > >> > not
> > > > > > >> > > to
> > > > > > >> > > > > use
> > > > > > >> > > > > > > the API without incurring any performance
> > regression.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > >> it might be a bit confusing and not user
> friendly
> > > to
> > > > > have
> > > > > > >> > > multiple
> > > > > > >> > > > > > > places that can override the checkpointing
> behaviour
> > > in
> > > > a
> > > > > > >> > different
> > > > > > >> > > > way
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Admittedly, adding more APIs always incur more
> > > > complexity.
> > > > > > But
> > > > > > >> > > > > sometimes
> > > > > > >> > > > > > we
> > > > > > >> > > > > > > have to incur this complexity to address new
> > > use-cases.
> > > > > > Maybe
> > > > > > >> we
> > > > > > >> > > can
> > > > > > >> > > > > see
> > > > > > >> > > > > > if
> > > > > > >> > > > > > > there are more user-friendly way to address this
> > > > use-case.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > >> already implemented and is simple from the
> > > > perspective
> > > > > of
> > > > > > >> > Flink
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Do you mean that the HybridSource operator should
> > > invoke
> > > > > the
> > > > > > >> rest
> > > > > > >> > > API
> > > > > > >> > > > > to
> > > > > > >> > > > > > > trigger checkpoints? The downside of this approach
> > is
> > > > that
> > > > > > it
> > > > > > >> > makes
> > > > > > >> > > > it
> > > > > > >> > > > > > hard
> > > > > > >> > > > > > > for developers of source operators (e.g. MySQL
> CDC,
> > > > > > >> HybridSource)
> > > > > > >> > > to
> > > > > > >> > > > > > > address the target use-case. AFAIK, there is no
> > > existing
> > > > > > case
> > > > > > >> > where
> > > > > > >> > > > we
> > > > > > >> > > > > > > require operator developers to use REST API to do
> > > their
> > > > > job.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Can you help explain the benefit of using REST API
> > > over
> > > > > > using
> > > > > > >> the
> > > > > > >> > > > > > proposed
> > > > > > >> > > > > > > API?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Note that this approach also seems to have the
> same
> > > > > downside
> > > > > > >> > > > mentioned
> > > > > > >> > > > > > > above: "multiple places that can override the
> > > > > checkpointing
> > > > > > >> > > > > behaviour". I
> > > > > > >> > > > > > > am not sure there can be a solution to address the
> > > > target
> > > > > > >> > use-case
> > > > > > >> > > > > > without
> > > > > > >> > > > > > > having multiple places that can affect the
> > > checkpointing
> > > > > > >> > behavior.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > >> check if `pendingRecords` for some source has
> > > > exceeded
> > > > > > the
> > > > > > >> > > > > configured
> > > > > > >> > > > > > > threshold and based on that adjust the
> checkpointing
> > > > > > interval
> > > > > > >> > > > > accordingly
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I am not sure this approach can address the target
> > > > > use-case
> > > > > > >> in a
> > > > > > >> > > > better
> > > > > > >> > > > > > > way. In the target use-case, we would like to
> > > > HybridSource
> > > > > > to
> > > > > > >> > > trigger
> > > > > > >> > > > > > > checkpoint more frequently when it is read the
> Kafka
> > > > > Source
> > > > > > >> (than
> > > > > > >> > > > when
> > > > > > >> > > > > it
> > > > > > >> > > > > > > is reading the HDFS source). We would need to set
> a
> > > flag
> > > > > for
> > > > > > >> the
> > > > > > >> > > > > > checkpoint
> > > > > > >> > > > > > > trigger to know which source the HybridSource is
> > > reading
> > > > > > from.
> > > > > > >> > But
> > > > > > >> > > > IMO
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > approach is less intuitive and more complex than
> > > having
> > > > > the
> > > > > > >> > > > > HybridSource
> > > > > > >> > > > > > > invoke upperBoundCheckpointingInterval() directly
> > once
> > > > it
> > > > > is
> > > > > > >> > > reading
> > > > > > >> > > > > > Kafka
> > > > > > >> > > > > > > Source.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Maybe I did not understand the alternative
> approach
> > > > > rightly.
> > > > > > >> I am
> > > > > > >> > > > happy
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > discuss more on this topic. WDYT?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Best,
> > > > > > >> > > > > > > Dong
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski <
> > > > > > >> > > > pnowoj...@apache.org>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Hi,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks for the proposal. However, are you sure
> > that
> > > > the
> > > > > > >> > > > > > > > OperatorCoordinator is the right place to place
> > such
> > > > > > logic?
> > > > > > >> > What
> > > > > > >> > > > > would
> > > > > > >> > > > > > > > happen if there are two (or more) operator
> > > > coordinators
> > > > > > with
> > > > > > >> > > > > > conflicting
> > > > > > >> > > > > > > > desired checkpoint trigger behaviour? If one
> > source
> > > is
> > > > > > >> > > processing a
> > > > > > >> > > > > > > backlog
> > > > > > >> > > > > > > > and the other is already processing real time
> > data,
> > > I
> > > > > > would
> > > > > > >> > > assume
> > > > > > >> > > > > that
> > > > > > >> > > > > > > in
> > > > > > >> > > > > > > > most use cases you would like to still have the
> > > longer
> > > > > > >> > > > checkpointing
> > > > > > >> > > > > > > > interval, not the shorter one. Also apart from
> > that,
> > > > it
> > > > > > >> might
> > > > > > >> > be
> > > > > > >> > > a
> > > > > > >> > > > > bit
> > > > > > >> > > > > > > > confusing and not user friendly to have multiple
> > > > places
> > > > > > that
> > > > > > >> > can
> > > > > > >> > > > > > override
> > > > > > >> > > > > > > > the checkpointing behaviour in a different way.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > FIY in the past, we had some discussions about
> > > similar
> > > > > > >> requests
> > > > > > >> > > and
> > > > > > >> > > > > > back
> > > > > > >> > > > > > > > then we chose to keep the system simpler, and
> > > exposed
> > > > a
> > > > > > more
> > > > > > >> > > > generic
> > > > > > >> > > > > > REST
> > > > > > >> > > > > > > > API checkpoint triggering mechanism. I know that
> > > > having
> > > > > to
> > > > > > >> > > > implement
> > > > > > >> > > > > > such
> > > > > > >> > > > > > > > logic outside of Flink and having to call REST
> > calls
> > > > to
> > > > > > >> trigger
> > > > > > >> > > > > > > checkpoints
> > > > > > >> > > > > > > > might not be ideal, but that's already
> implemented
> > > and
> > > > > is
> > > > > > >> > simple
> > > > > > >> > > > from
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > perspective of Flink.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I don't know, maybe instead of adding this logic
> > to
> > > > > > operator
> > > > > > >> > > > > > > coordinators,
> > > > > > >> > > > > > > > `CheckpointCoordinator` should have a pluggable
> > > > > > >> > > > `CheckpointTrigger`,
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > the user could configure like a
> `MetricReporter`.
> > > The
> > > > > > >> default
> > > > > > >> > one
> > > > > > >> > > > > would
> > > > > > >> > > > > > > be
> > > > > > >> > > > > > > > just periodically triggering checkpoints. Maybe
> > > > > > >> > > > > > > > `BacklogDynamicCheckpointTrigger` could look at
> > > > > > metrics[1],
> > > > > > >> > check
> > > > > > >> > > > if
> > > > > > >> > > > > > > > `pendingRecords` for some source has exceeded
> the
> > > > > > configured
> > > > > > >> > > > > threshold
> > > > > > >> > > > > > > and
> > > > > > >> > > > > > > > based on that adjust the checkpointing interval
> > > > > > accordingly?
> > > > > > >> > This
> > > > > > >> > > > > would
> > > > > > >> > > > > > > at
> > > > > > >> > > > > > > > least address some of my concerns.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > WDYT?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Best,
> > > > > > >> > > > > > > > Piotrek
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > [1]
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou <
> > > > > > >> > > flink.zhouyunf...@gmail.com>
> > > > > > >> > > > > > > > napisał(a):
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >> Hi all,
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> Dong(cc'ed) and I are opening this thread to
> > > discuss
> > > > > our
> > > > > > >> > > proposal
> > > > > > >> > > > to
> > > > > > >> > > > > > > >> support dynamically triggering checkpoints from
> > > > > > operators,
> > > > > > >> > which
> > > > > > >> > > > has
> > > > > > >> > > > > > > >> been documented in FLIP-309
> > > > > > >> > > > > > > >> <
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> > > > > > >> > > > > > > >> >.
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> With the help of the ability proposed in this
> > FLIP,
> > > > > users
> > > > > > >> > could
> > > > > > >> > > > > > > >> improve the performance of their Flink job in
> > cases
> > > > > like
> > > > > > >> when
> > > > > > >> > > the
> > > > > > >> > > > > job
> > > > > > >> > > > > > > >> needs to process both historical batch data and
> > > > > real-time
> > > > > > >> > > > streaming
> > > > > > >> > > > > > > >> data, by adjusting the checkpoint triggerings
> in
> > > > > > different
> > > > > > >> > > phases
> > > > > > >> > > > > of a
> > > > > > >> > > > > > > >> HybridSource or CDC source.
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> This proposal would be a fundamental component
> in
> > > the
> > > > > > >> effort
> > > > > > >> > to
> > > > > > >> > > > > > > >> further unify Flink's batch and stream
> processing
> > > > > > ability.
> > > > > > >> > > Please
> > > > > > >> > > > > feel
> > > > > > >> > > > > > > >> free to reply to this email thread and share
> with
> > > us
> > > > > your
> > > > > > >> > > > opinions.
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> Best regards.
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> Dong and Yunfeng
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to