Scaling with unaligned checkpoints might be a necessity.

Let's assume the job failed due to a lost TaskManager, but no new
TaskManager becomes available.
In that case we need to scale down based on the latest complete checkpoint,
because we cannot produce a new checkpoint.


On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <seniorcarb...@gmail.com>
wrote:

> +1 I think we are on the same page Stephan.
>
> Rescaling on unaligned checkpoint sounds challenging and a bit
> unnecessary. No?
> Why not sticking to aligned snapshots for live reconfiguration/rescaling?
> It’s a pretty rare operation and it would simplify things by a lot.
> Everything can be “staged” upon alignment including replacing channels and
> tasks.
>
> -Paris
>
> > On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi all!
> >
> > Yes, the first proposal of "unaligend checkpoints" (probably two years
> back
> > now) drew a major inspiration from Chandy Lamport, as did actually the
> > original checkpointing algorithm.
> >
> > "Logging data between first and last barrier" versus "barrier jumping
> over
> > buffer and storing those buffers" is pretty close same.
> > However, there are a few nice benefits of the proposal of unaligned
> > checkpoints over Chandy-Lamport.
> >
> > *## Benefits of Unaligned Checkpoints*
> >
> > (1) It is very similar to the original algorithm (can be seen an an
> > optional feature purely in the network stack) and thus can share lot's of
> > code paths.
> >
> > (2) Less data stored. If we make the "jump over buffers" part timeout
> based
> > (for example barrier overtakes buffers if not flushed within 10ms) then
> > checkpoints are in the common case of flowing pipelines aligned without
> > in-flight data. Only back pressured cases store some in-flight data,
> which
> > means we don't regress in the common case and only fix the back pressure
> > case.
> >
> > (3) Faster checkpoints. Chandy Lamport still waits for all barriers to
> > arrive naturally, logging on the way. If data processing is slow, this
> can
> > still take quite a while.
> >
> > ==> I think both these points are strong reasons to not change the
> > mechanism away from "trigger sources" and start with CL-style "trigger
> all".
> >
> >
> > *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints*
> >
> > We can think about something like "take state snapshot on first barrier"
> > and then store buffers until the other barriers arrive. Inside the
> network
> > stack, barriers could still overtake and persist buffers.
> > The benefit would be less latency increase in the channels which already
> > have received barriers.
> > However, as mentioned before, not prioritizing the inputs from which
> > barriers are still missing can also have an adverse effect.
> >
> >
> > *## Concerning upgrades*
> >
> > I think it is a fair restriction to say that upgrades need to happen on
> > aligned checkpoints. It is a rare enough operation.
> >
> >
> > *## Concerning re-scaling (changing parallelism)*
> >
> > We need to support that on unaligned checkpoints as well. There are
> several
> > feature proposals about automatic scaling, especially down scaling in
> case
> > of missing resources. The last snapshot might be a regular checkpoint, so
> > all checkpoints need to support rescaling.
> >
> >
> > *## Concerning end-to-end checkpoint duration and "trigger sources"
> versus
> > "trigger all"*
> >
> > I think for the end-to-end checkpoint duration, an "overtake buffers"
> > approach yields faster checkpoints, as mentioned above (Chandy Lamport
> > logging still needs to wait for barrier to flow).
> >
> > I don't see the benefit of a "trigger all tasks via RPC concurrently"
> > approach. Bear in mind that it is still a globally coordinated approach
> and
> > you need to wait for the global checkpoint to complete before committing
> > any side effects.
> > I believe that the checkpoint time is more determined by the state
> > checkpoint writing, and the global coordination and metadata commit, than
> > by the difference in alignment time between "trigger from source and jump
> > over buffers" versus "trigger all tasks concurrently".
> >
> > Trying to optimize a few tens of milliseconds out of the network stack
> > sends (and changing the overall checkpointing approach completely for
> that)
> > while staying with a globally coordinated checkpoint will send us down a
> > path to a dead end.
> >
> > To really bring task persistence latency down to 10s of milliseconds (so
> we
> > can frequently commit in sinks), we need to take an approach without any
> > global coordination. Tasks need to establish a persistent recovery point
> > individually and at their own discretion, only then can it be frequent
> > enough. To get there, they would need to decouple themselves from the
> > predecessor and successor tasks (via something like persistent channels).
> > This is a different discussion, though, somewhat orthogonal to this one
> > here.
> >
> > Best,
> > Stephan
> >
> >
> > On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
> >
> >> Hi again,
> >>
> >> Zhu Zhu let me think about this more. Maybe as Paris is writing, we do
> not
> >> need to block any channels at all, at least assuming credit base flow
> >> control. Regarding what should happen with the following checkpoint is
> >> another question. Also, should we support concurrent checkpoints and
> >> subsuming checkpoints as we do now? Maybe not…
> >>
> >> Paris
> >>
> >> Re
> >> I. 2. a) and b) - yes, this would have to be taken into an account
> >> I. 2. c) and IV. 2. - without those, end to end checkpoint time will
> >> probably be longer than it could be. It might affect external systems.
> For
> >> example Kafka, which automatically time outs lingering transactions, and
> >> for us, the transaction time is equal to the time between two
> checkpoints.
> >>
> >> II 1. - I’m confused. To make things straight. Flink is currently
> >> snapshotting once it receives all of the checkpoint barriers from all of
> >> the input channels and only then it broadcasts the checkpoint barrier
> down
> >> the stream. And this is correct from exactly-once perspective.
> >>
> >> As far as I understand, your proposal based on Chandy Lamport algorithm,
> >> is snapshotting the state of the operator on the first checkpoint
> barrier,
> >> which also looks correct to me.
> >>
> >> III. 1. As I responded to Zhu Zhu, let me think a bit more about this.
> >>
> >> V. Yes, we still need aligned checkpoints, as they are easier for state
> >> migration and upgrades.
> >>
> >> Piotrek
> >>
> >>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com>
> wrote:
> >>>
> >>> Now I see a little more clearly what you have in mind. Thanks for the
> >> explanation!
> >>> There are a few intermixed concepts here, some how to do with
> >> correctness some with performance.
> >>> Before delving deeper I will just enumerate a few things to make myself
> >> a little more helpful if I can.
> >>>
> >>> I. Initiation
> >>> -------------
> >>>
> >>> 1. RPC to sources only is a less intrusive way to initiate snapshots
> >> since you utilize better pipeline parallelism (only a small subset of
> tasks
> >> is running progressively the protocol at a time, if snapshotting is
> async
> >> the overall overhead might not even be observable).
> >>>
> >>> 2. If we really want an RPC to all initiation take notice of the
> >> following implications:
> >>>
> >>>      a. (correctness) RPC calls are not guaranteed to arrive in every
> >> task before a marker from a preceding task.
> >>>
> >>>      b. (correctness) Either the RPC call OR the first arriving marker
> >> should initiate the algorithm. Whichever comes first. If you only do it
> per
> >> RPC call then you capture a "late" state that includes side effects of
> >> already logged events.
> >>>
> >>>      c. (performance) Lots of IO will be invoked at the same time on
> >> the backend store from all tasks. This might lead to high congestion in
> >> async snapshots.
> >>>
> >>> II. Capturing State First
> >>> -------------------------
> >>>
> >>> 1. (correctness) Capturing state at the last marker sounds incorrect to
> >> me (state contains side effects of already logged events based on the
> >> proposed scheme). This results into duplicate processing. No?
> >>>
> >>> III. Channel Blocking / "Alignment"
> >>> -----------------------------------
> >>>
> >>> 1. (performance?) What is the added benefit? We dont want a "complete"
> >> transactional snapshot, async snapshots are purely for failure-recovery.
> >> Thus, I dont see why this needs to be imposed at the expense of
> >> performance/throughput. With the proposed scheme the whole dataflow
> anyway
> >> enters snapshotting/logging mode so tasks more or less snapshot
> >> concurrently.
> >>>
> >>> IV Marker Bypassing
> >>> -------------------
> >>>
> >>> 1. (correctness) This leads to equivalent in-flight snapshots so with
> >> some quick thinking  correct. I will try to model this later and get
> back
> >> to you in case I find something wrong.
> >>>
> >>> 2. (performance) It also sounds like a meaningful optimisation! I like
> >> thinking of this as a push-based snapshot. i.e., the producing task
> somehow
> >> triggers forward a consumer/channel to capture its state. By example
> >> consider T1 -> |marker t1| -> T2.
> >>>
> >>> V. Usage of "Async" Snapshots
> >>> ---------------------
> >>>
> >>> 1. Do you see this as a full replacement of "full" aligned
> >> snapshots/savepoints? In my view async shanpshots will be needed from
> time
> >> to time but not as frequently. Yet, it seems like a valid approach
> solely
> >> for failure-recovery on the same configuration. Here's why:
> >>>
> >>>      a. With original snapshotting there is a strong duality between
> >>>      a stream input (offsets) and committed side effects (internal
> >> states and external commits to transactional sinks). While in the async
> >> version, there are uncommitted operations (inflight records). Thus, you
> >> cannot use these snapshots for e.g., submitting sql queries with
> snapshot
> >> isolation. Also, the original snapshotting gives a lot of potential for
> >> flink to make proper transactional commits externally.
> >>>
> >>>      b. Reconfiguration is very tricky, you probably know that better.
> >> Inflight channel state is no longer valid in a new configuration (i.e.,
> new
> >> dataflow graph, new operators, updated operator logic, different
> channels,
> >> different parallelism)
> >>>
> >>> 2. Async snapshots can also be potentially useful for monitoring the
> >> general health of a dataflow since they can be analyzed by the task
> manager
> >> about the general performance of a job graph and spot bottlenecks for
> >> example.
> >>>
> >>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> Thomas:
> >>>> There are no Jira tickets yet (or maybe there is something very old
> >> somewhere). First we want to discuss it, next present FLIP and at last
> >> create tickets :)
> >>>>
> >>>>> if I understand correctly, then the proposal is to not block any
> >>>>> input channel at all, but only log data from the backpressured
> channel
> >> (and
> >>>>> make it part of the snapshot) until the barrier arrives
> >>>>
> >>>> I would guess that it would be better to block the reads, unless we
> can
> >> already process the records from the blocked channel…
> >>>>
> >>>> Paris:
> >>>>
> >>>> Thanks for the explanation Paris. I’m starting to understand this more
> >> and I like the idea of snapshotting the state of an operator before
> >> receiving all of the checkpoint barriers - this would allow more things
> to
> >> happen at the same time instead of sequentially. As Zhijiang has pointed
> >> out there are some things not considered in your proposal: overtaking
> >> output buffers, but maybe those things could be incorporated together.
> >>>>
> >>>> Another thing is that from the wiki description I understood that the
> >> initial checkpointing is not initialised by any checkpoint barrier, but
> by
> >> an independent call/message from the Observer. I haven’t played with
> this
> >> idea a lot, but I had some discussion with Nico and it seems that it
> might
> >> work:
> >>>>
> >>>> 1. JobManager sends and RPC “start checkpoint” to all tasks
> >>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from
> 1.,
> >> takes a snapshot of it's state and:
> >>>> a) broadcast checkpoint barrier down the stream to all channels (let’s
> >> ignore for a moment potential for this barrier to overtake the buffer
> >> output data)
> >>>> b) for any input channel for which it hasn’t yet received checkpoint
> >> barrier, the data are being added to the checkpoint
> >>>> c) once a channel (for example l1) receives a checkpoint barrier, the
> >> Task blocks reads from that channel (?)
> >>>> d) after all remaining channels (l2) receive checkpoint barriers, the
> >> Task  first has to process the buffered data after that it can unblock
> the
> >> reads from the channels
> >>>>
> >>>> Checkpoint barriers do not cascade/flow through different tasks here.
> >> Checkpoint barrier emitted from Task1, reaches only the immediate
> >> downstream Tasks. Thanks to this setup, total checkpointing time is not
> sum
> >> of checkpointing times of all Tasks one by one, but more or less max of
> the
> >> slowest Tasks. Right?
> >>>>
> >>>> Couple of intriguing thoughts are:
> >>>> 3. checkpoint barriers overtaking the output buffers
> >>>> 4. can we keep processing some data (in order to not waste CPU cycles)
> >> after we have taking the snapshot of the Task. I think we could.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote:
> >>>>>
> >>>>> Great discussion! I'm excited that this is already under
> >> consideration! Are
> >>>>> there any JIRAs or other traces of discussion to follow?
> >>>>>
> >>>>> Paris, if I understand correctly, then the proposal is to not block
> any
> >>>>> input channel at all, but only log data from the backpressured
> channel
> >> (and
> >>>>> make it part of the snapshot) until the barrier arrives? This is
> >>>>> intriguing. But probably there is also a benefit of to not continue
> >> reading
> >>>>> I1 since that could speed up retrieval from I2. Also, if the user
> code
> >> is
> >>>>> the cause of backpressure, this would avoid pumping more data into
> the
> >>>>> process function.
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <wangzhijiang...@aliyun.com
> >> .invalid>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Paris,
> >>>>>>
> >>>>>> Thanks for the detailed sharing. And I think it is very similar with
> >> the
> >>>>>> way of overtaking we proposed before.
> >>>>>>
> >>>>>> There are some tiny difference:
> >>>>>> The way of overtaking might need to snapshot all the input/output
> >> queues.
> >>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels
> after
> >> the
> >>>>>> first barrier arrives, which might reduce the state sizea bit. But
> >> normally
> >>>>>> there should be less buffers for the first input channel with
> barrier.
> >>>>>> The output barrier still follows with regular data stream in Chandy
> >>>>>> Lamport, the same way as current flink. For overtaking way, we need
> >> to pay
> >>>>>> extra efforts to make barrier transport firstly before outque queue
> on
> >>>>>> upstream side, and change the way of barrier alignment based on
> >> receiving
> >>>>>> instead of current reading on downstream side.
> >>>>>> In the backpressure caused by data skew, the first barrier in almost
> >> empty
> >>>>>> input channel should arrive much eariler than the last heavy load
> >> input
> >>>>>> channel, so the Chandy Lamport could benefit well. But for the case
> >> of all
> >>>>>> balanced heavy load input channels, I mean the first arrived barrier
> >> might
> >>>>>> still take much time, then the overtaking way could still fit well
> to
> >> speed
> >>>>>> up checkpoint.
> >>>>>> Anyway, your proposed suggestion is helpful on my side, especially
> >>>>>> considering some implementation details .
> >>>>>>
> >>>>>> Best,
> >>>>>> Zhijiang
> >>>>>> ------------------------------------------------------------------
> >>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
> >>>>>> Send Time:2019年8月13日(星期二) 14:03
> >>>>>> To:dev <dev@flink.apache.org>
> >>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
> >>>>>> Subject:Re: Checkpointing under backpressure
> >>>>>>
> >>>>>> yes! It’s quite similar I think.  Though mind that the devil is in
> the
> >>>>>> details, i.e., the temporal order actions are taken.
> >>>>>>
> >>>>>> To clarify, let us say you have a task T with two input channels I1
> >> and I2.
> >>>>>> The Chandy Lamport execution flow is the following:
> >>>>>>
> >>>>>> 1) T receives barrier from  I1 and...
> >>>>>> 2)  ...the following three actions happen atomically
> >>>>>> I )  T snapshots its state T*
> >>>>>> II)  T forwards marker to its outputs
> >>>>>> III) T starts logging all events of I2 (only) into a buffer M*
> >>>>>> - Also notice here that T does NOT block I1 as it does in aligned
> >>>>>> snapshots -
> >>>>>> 3) Eventually T receives barrier from I2 and stops recording events.
> >> Its
> >>>>>> asynchronously captured snapshot is now complete: {T*,M*}.
> >>>>>> Upon recovery all messages of M* should be replayed in FIFO order.
> >>>>>>
> >>>>>> With this approach alignment does not create a deadlock situation
> >> since
> >>>>>> anyway 2.II happens asynchronously and messages can be logged as
> well
> >>>>>> asynchronously during the process of the snapshot. If there is
> >>>>>> back-pressure in a pipeline the cause is most probably not this
> >> algorithm.
> >>>>>>
> >>>>>> Back to your observation, the answer : yes and no.  In your network
> >> model,
> >>>>>> I can see the logic of “logging” and “committing” a final snapshot
> >> being
> >>>>>> provided by the channel implementation. However, do mind that the
> >> first
> >>>>>> barrier always needs to go “all the way” to initiate the Chandy
> >> Lamport
> >>>>>> algorithm logic.
> >>>>>>
> >>>>>> The above flow has been proven using temporal logic in my phd thesis
> >> in
> >>>>>> case you are interested about the proof.
> >>>>>> I hope this helps a little clarifying things. Let me know if there
> is
> >> any
> >>>>>> confusing point to disambiguate. I would be more than happy to help
> >> if I
> >>>>>> can.
> >>>>>>
> >>>>>> Paris
> >>>>>>
> >>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com>
> >> wrote:
> >>>>>>>
> >>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t
> >> you
> >>>>>> still have to wait for the “checkpoint barrier” to arrive in order
> to
> >> know
> >>>>>> when have you already received all possible messages from the
> upstream
> >>>>>> tasks/operators? So instead of processing the “in flight” messages
> >> (as the
> >>>>>> Flink is doing currently), you are sending them to an “observer”?
> >>>>>>>
> >>>>>>> In that case, that’s sounds similar to “checkpoint barriers
> >> overtaking
> >>>>>> in flight records” (aka unaligned checkpoints). Just for us, the
> >> observer
> >>>>>> is a snapshot state.
> >>>>>>>
> >>>>>>> Piotrek
> >>>>>>>
> >>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <seniorcarb...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Interesting problem! Thanks for bringing it up Thomas.
> >>>>>>>>
> >>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport
> >> snapshots
> >>>>>> [1] would help out solve this problem more elegantly without
> >> sacrificing
> >>>>>> correctness.
> >>>>>>>> - They do not need alignment, only (async) logging for in-flight
> >>>>>> records between the time the first barrier is processed until the
> last
> >>>>>> barrier arrives in a task.
> >>>>>>>> - They work fine for failure recovery as long as logged records
> are
> >>>>>> replayed on startup.
> >>>>>>>>
> >>>>>>>> Flink’s “alligned” savepoints would probably be still necessary
> for
> >>>>>> transactional sink commits + any sort of reconfiguration (e.g.,
> >> rescaling,
> >>>>>> updating the logic of operators to evolve an application etc.).
> >>>>>>>>
> >>>>>>>> I don’t completely understand the “overtaking” approach but if you
> >> have
> >>>>>> a concrete definition I would be happy to check it out and help if I
> >> can!
> >>>>>>>> Mind that Chandy-Lamport essentially does this by logging things
> in
> >>>>>> pending channels in a task snapshot before the barrier arrives.
> >>>>>>>>
> >>>>>>>> -Paris
> >>>>>>>>
> >>>>>>>> [1]
> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> >> <
> >>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
> >>>>>>>>
> >>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com>
> >> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Thomas,
> >>>>>>>>>
> >>>>>>>>> As Zhijiang has responded, we are now in the process of
> discussing
> >> how
> >>>>>> to address this issue and one of the solution that we are discussing
> >> is
> >>>>>> exactly what you are proposing: checkpoint barriers overtaking the
> in
> >>>>>> flight data and make the in flight data part of the checkpoint.
> >>>>>>>>>
> >>>>>>>>> If everything works well, we will be able to present result of
> our
> >>>>>> discussions on the dev mailing list soon.
> >>>>>>>>>
> >>>>>>>>> Piotrek
> >>>>>>>>>
> >>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com
> >> .INVALID>
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Thomas,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for proposing this concern. The barrier alignment takes
> >> long
> >>>>>> time in backpressure case which could cause several problems:
> >>>>>>>>>> 1. Checkpoint timeout as you mentioned.
> >>>>>>>>>> 2. The recovery cost is high once failover, because much data
> >> needs
> >>>>>> to be replayed.
> >>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once.
> >>>>>>>>>>
> >>>>>>>>>> For credit-based flow control from release-1.5, the amount of
> >>>>>> in-flighting buffers before barrier alignment is reduced, so we
> could
> >> get a
> >>>>>> bit
> >>>>>>>>>> benefits from speeding checkpoint aspect.
> >>>>>>>>>>
> >>>>>>>>>> In release-1.8, I guess we did not suspend the channels which
> >> already
> >>>>>> received the barrier in practice. But actually we ever did the
> >> similar thing
> >>>>>>>>>> to speed barrier alighment before. I am not quite sure that
> >>>>>> release-1.8 covers this feature. There were some relevant
> discussions
> >> under
> >>>>>> jira [1].
> >>>>>>>>>>
> >>>>>>>>>> For release-1.10, the community is now discussing the feature of
> >>>>>> unaligned checkpoint which is mainly for resolving above concerns.
> The
> >>>>>> basic idea
> >>>>>>>>>> is to make barrier overtakes the output/input buffer queue to
> >> speed
> >>>>>> alignment, and snapshot the input/output buffers as part of
> checkpoint
> >>>>>> state. The
> >>>>>>>>>> details have not confirmed yet and is still under discussion.
> >> Wish we
> >>>>>> could make some improvments for the release-1.10.
> >>>>>>>>>>
> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Zhijiang
> >>>>>>>>>>
> ------------------------------------------------------------------
> >>>>>>>>>> From:Thomas Weise <t...@apache.org>
> >>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
> >>>>>>>>>> To:dev <dev@flink.apache.org>
> >>>>>>>>>> Subject:Checkpointing under backpressure
> >>>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> One of the major operational difficulties we observe with Flink
> >> are
> >>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both
> >>>>>> confirmation
> >>>>>>>>>> of my understanding of the current behavior as well as pointers
> >> for
> >>>>>> future
> >>>>>>>>>> improvement work:
> >>>>>>>>>>
> >>>>>>>>>> Prior to introduction of credit based flow control in the
> network
> >>>>>> stack [1]
> >>>>>>>>>> [2], checkpoint barriers would back up with the data for all
> >> logical
> >>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the buffers
> are
> >>>>>>>>>> controlled per channel, and checkpoint barriers are only held
> >> back for
> >>>>>>>>>> channels that have backpressure, while others can continue
> >> processing
> >>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake
> >> data",
> >>>>>>>>>> therefore checkpoint alignment remains affected for the channel
> >> with
> >>>>>>>>>> backpressure, with the potential for slow checkpointing and
> >> timeouts.
> >>>>>>>>>> Albeit the delay of barriers would be capped by the maximum
> >> in-transit
> >>>>>>>>>> buffers per channel, resulting in an improvement compared to
> >> previous
> >>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint
> >> alignment
> >>>>>> can
> >>>>>>>>>> help the barrier advance faster on the receiver side (by
> >> suspending
> >>>>>>>>>> channels that have already delivered the barrier). Is that
> >> accurate
> >>>>>> as of
> >>>>>>>>>> Flink 1.8?
> >>>>>>>>>>
> >>>>>>>>>> What appears to be missing to completely unblock checkpointing
> is
> >> a
> >>>>>>>>>> mechanism for checkpoints to overtake the data. That would help
> in
> >>>>>>>>>> situations where the processing itself is the bottleneck and
> >>>>>> prioritization
> >>>>>>>>>> in the network stack alone cannot address the barrier delay. Was
> >>>>>> there any
> >>>>>>>>>> related discussion? One possible solution would be to drain
> >> incoming
> >>>>>> data
> >>>>>>>>>> till the barrier and make it part of the checkpoint instead of
> >>>>>> processing
> >>>>>>>>>> it. This is somewhat related to asynchronous processing, but I'm
> >>>>>> thinking
> >>>>>>>>>> more of a solution that is automated in the Flink runtime for
> the
> >>>>>>>>>> backpressure scenario only.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Thomas
> >>>>>>>>>>
> >>>>>>>>>> [1]
> https://flink.apache.org/2019/06/05/flink-network-stack.html
> >>>>>>>>>> [2]
> >>>>>>>>>>
> >>>>>>
> >>
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> >>
>
>

Reply via email to