Sry incorrect link, please follow [1].

[1]
https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3CCAGZNd0FgVL0oDQJHpBwJ1Ha8QevsVG0FHixdet11tLhW2p-2hg%40mail.gmail.com%3E

On Wed, Oct 2, 2019 at 3:44 PM Arvid Heise <ar...@ververica.com> wrote:

> FYI, we published FLIP-76 to address the issue and discussion has been
> opened in [1].
>
> Looking forward to your feedback,
>
> Arvid
>
> [1]
> https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser
>
> On Thu, Aug 15, 2019 at 9:43 AM Yun Gao <yungao...@aliyun.com.invalid>
> wrote:
>
>> Hi,
>>     Very thanks for the great points!
>>
>>     For the prioritizing inputs, from another point of view, I think it
>> might not cause other bad effects, since we do not need to totally block
>> the channels that have seen barriers after the operator has taking
>> snapshot. After the snapshotting, if the channels that has not seen
>> barriers have buffers, we could first logging and processing these buffers
>> and if they do not have buffers, we can still processing the buffers from
>> the channels that has seen barriers. Therefore, It seems prioritizing
>> inputs should be able to accelerate the checkpoint without other bad
>> effects.
>>
>>    and @zhijiangFor making the unaligned checkpoint the only mechanism
>> for all cases, I still think we should allow a configurable timeout after
>> receiving the first barrier so that the channels may get "drained" during
>> the timeout, as pointed out by Stephan. With such a timeout, we are very
>> likely not need to snapshot the input buffers, which would be very similar
>> to the current aligned checkpoint mechanism.
>>
>> Best,
>> Yun
>>
>>
>> ------------------------------------------------------------------
>> From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
>> Send Time:2019 Aug. 15 (Thu.) 02:22
>> To:dev <dev@flink.apache.org>
>> Subject:Re: Checkpointing under backpressure
>>
>> > For the checkpoint to complete, any buffer that
>> > arrived prior to the barrier would be to be part of the checkpointed
>> state.
>>
>> Yes, I agree.
>>
>> > So wouldn't it be important to finish persisting these buffers as fast
>> as
>> > possible by prioritizing respective inputs? The task won't be able to
>> > process records from the inputs that have seen the barrier fast when it
>> is
>> > already backpressured (or causing the backpressure).
>>
>> My previous understanding of prioritizing inputs is from task processing
>> aspect after snapshot state. If from the persisting buffers aspect, I think
>> it might be up to how we implement it.
>> If we only tag/reference which buffers in inputs be the part of state,
>> and make the real persisting work is done in async way. That means the
>> already tagged buffers could be processed by task w/o priority.
>> And only after all the persisting work done, the task would report to
>> coordinator of finished checkpoint on its side. The key point is how we
>> implement to make task could continue processing buffers as soon as
>> possible.
>>
>> Thanks for the further explannation of requirements for speeding up
>> checkpoints in backpressure scenario. To make the savepoint finish quickly
>> and then tune the setting to avoid backpressure is really a pratical case.
>> I think this solution could cover this concern.
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Thomas Weise <t...@apache.org>
>> Send Time:2019年8月14日(星期三) 19:48
>> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
>> Subject:Re: Checkpointing under backpressure
>>
>> -->
>>
>> On Wed, Aug 14, 2019 at 10:23 AM zhijiang
>> <wangzhijiang...@aliyun.com.invalid> wrote:
>>
>> > Thanks for these great points and disccusions!
>> >
>> > 1. Considering the way of triggering checkpoint RPC calls to all the
>> tasks
>> > from Chandy Lamport, it combines two different mechanisms together to
>> make
>> > sure that the trigger could be fast in different scenarios.
>> > But in flink world it might be not very worth trying that way, just as
>> > Stephan's analysis for it. Another concern is that it might bring more
>> > heavy loads for JobMaster broadcasting this checkpoint RPC to all the
>> tasks
>> > in large scale job, especially for the very short checkpoint interval.
>> > Furthermore it would also cause other important RPC to be executed
>> delay to
>> > bring potentail timeout risks.
>> >
>> > 2. I agree with the idea of drawing on the way "take state snapshot on
>> > first barrier" from Chandy Lamport instead of barrier alignment
>> combining
>> > with unaligned checkpoints in flink.
>> >
>> > > >>>> The benefit would be less latency increase in the channels which
>> > already have received barriers.
>> > > >>>> However, as mentioned before, not prioritizing the inputs from
>> > which barriers are still missing can also have an adverse effect.
>> >
>> > I think we will not have an adverse effect if not prioritizing the
>> inputs
>> > w/o barriers in this case. After sync snapshot, the task could actually
>> > process any input channels. For the input channel receiving the first
>> > barrier, we already have the obvious boundary for persisting buffers.
>> For
>> > other channels w/o barriers we could persist the following buffers for
>> > these channels until barrier arrives in network. Because based on the
>> > credit based flow control, the barrier does not need credit to
>> transport,
>> > then as long as the sender overtakes the barrier accross the output
>> queue,
>> > the network stack would transport this barrier immediately no matter
>> with
>> > the inputs condition on receiver side. So there is no requirements to
>> > consume accumulated buffers in these channels for higher priority. If
>> so it
>> > seems that we will not waste any CPU cycles as Piotr concerns before.
>> >
>>
>> I'm not sure I follow this. For the checkpoint to complete, any buffer
>> that
>> arrived prior to the barrier would be to be part of the checkpointed
>> state.
>> So wouldn't it be important to finish persisting these buffers as fast as
>> possible by prioritizing respective inputs? The task won't be able to
>> process records from the inputs that have seen the barrier fast when it is
>> already backpressured (or causing the backpressure).
>>
>>
>> >
>> > 3. Suppose the unaligned checkpoints performing well in practice, is it
>> > possible to make it as the only mechanism for handling all the cases? I
>> > mean for the non-backpressure scenario, there are less buffers even
>> empty
>> > in input/output queue, then the "overtaking barrier--> trigger snapshot
>> on
>> > first barrier--> persist buffers" might still work well. So we do not
>> need
>> > to maintain two suits of mechanisms finally.
>> >
>> > 4.  The initial motivation of this dicussion is for checkpoint timeout
>> in
>> > backpressure scenario. If we adjust the default timeout to a very big
>> > value, that means the checkpoint would never timeout and we only need to
>> > wait it finish. Then are there still any other problems/concerns if
>> > checkpoint takes long time to finish? Althougn we already knew some
>> issues
>> > before, it is better to gather more user feedbacks to confirm which
>> aspects
>> > could be solved in this feature design. E.g. the sink commit delay might
>> > not be coverd by unaligned solution.
>> >
>>
>> Checkpoints taking too long is the concern that sparks this discussion
>> (timeout is just a symptom). The slowness issue also applies to the
>> savepoint use case. We would need to be able to take a savepoint fast in
>> order to roll forward a fix that can alleviate the backpressure (like
>> changing parallelism or making a different configuration change).
>>
>>
>> >
>> > Best,
>> > Zhijiang
>> > ------------------------------------------------------------------
>> > From:Stephan Ewen <se...@apache.org>
>> > Send Time:2019年8月14日(星期三) 17:43
>> > To:dev <dev@flink.apache.org>
>> > Subject:Re: Checkpointing under backpressure
>> >
>> > Quick note: The current implementation is
>> >
>> > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)
>> >
>> > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com>
>> > wrote:
>> >
>> > > > Thanks for the great ideas so far.
>> > >
>> > > +1
>> > >
>> > > Regarding other things raised, I mostly agree with Stephan.
>> > >
>> > > I like the idea of simultaneously starting the checkpoint everywhere
>> via
>> > > RPC call (especially in cases where Tasks are busy doing some
>> synchronous
>> > > operations for example for tens of milliseconds. In that case every
>> > network
>> > > exchange adds tens of milliseconds of delay in propagating the
>> > checkpoint).
>> > > However I agree that this might be a premature optimisation assuming
>> the
>> > > current state of our code (we already have checkpoint barriers).
>> > >
>> > > However I like the idea of switching from:
>> > >
>> > > 1. A -> S -> F (Align -> snapshot -> forward markers)
>> > >
>> > > To
>> > >
>> > > 2. S -> F -> L (Snapshot -> forward markers -> log pending channels)
>> > >
>> > > Or even to
>> > >
>> > > 6. F -> S -> L (Forward markers -> snapshot -> log pending channels)
>> > >
>> > > It feels to me like this would decouple propagation of checkpoints
>> from
>> > > costs of synchronous snapshots and waiting for all of the checkpoint
>> > > barriers to arrive (even if they will overtake in-flight records, this
>> > > might take some time).
>> > >
>> > > > What I like about the Chandy Lamport approach (2.) initiated from
>> > > sources is that:
>> > > >       - Snapshotting imposes no modification to normal processing.
>> > >
>> > > Yes, I agree that would be nice. Currently, during the alignment and
>> > > blocking of the input channels, we might be wasting CPU cycles of up
>> > stream
>> > > tasks. If we succeed in designing new checkpointing mechanism to not
>> > > disrupt/block regular data processing (% the extra IO cost for logging
>> > the
>> > > in-flight records), that would be a huge improvement.
>> > >
>> > > Piotrek
>> > >
>> > > > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com>
>> > wrote:
>> > > >
>> > > > Sure I see. In cases when no periodic aligned snapshots are employed
>> > > this is the only option.
>> > > >
>> > > > Two things that were not highlighted enough so far on the proposed
>> > > protocol (included my mails):
>> > > >       - The Recovery/Reconfiguration strategy should strictly
>> > prioritise
>> > > processing logged events before entering normal task input operation.
>> > > Otherwise causality can be violated. This also means dataflow recovery
>> > will
>> > > be expected to be slower to the one employed on an aligned snapshot.
>> > > >       - Same as with state capture, markers should be forwarded upon
>> > > first marker received on input. No later than that. Otherwise we have
>> > > duplicate side effects.
>> > > >
>> > > > Thanks for the great ideas so far.
>> > > >
>> > > > Paris
>> > > >
>> > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote:
>> > > >>
>> > > >> Scaling with unaligned checkpoints might be a necessity.
>> > > >>
>> > > >> Let's assume the job failed due to a lost TaskManager, but no new
>> > > >> TaskManager becomes available.
>> > > >> In that case we need to scale down based on the latest complete
>> > > checkpoint,
>> > > >> because we cannot produce a new checkpoint.
>> > > >>
>> > > >>
>> > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <
>> > seniorcarb...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >>> +1 I think we are on the same page Stephan.
>> > > >>>
>> > > >>> Rescaling on unaligned checkpoint sounds challenging and a bit
>> > > >>> unnecessary. No?
>> > > >>> Why not sticking to aligned snapshots for live
>> > > reconfiguration/rescaling?
>> > > >>> It’s a pretty rare operation and it would simplify things by a
>> lot.
>> > > >>> Everything can be “staged” upon alignment including replacing
>> > channels
>> > > and
>> > > >>> tasks.
>> > > >>>
>> > > >>> -Paris
>> > > >>>
>> > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote:
>> > > >>>>
>> > > >>>> Hi all!
>> > > >>>>
>> > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably two
>> > years
>> > > >>> back
>> > > >>>> now) drew a major inspiration from Chandy Lamport, as did
>> actually
>> > the
>> > > >>>> original checkpointing algorithm.
>> > > >>>>
>> > > >>>> "Logging data between first and last barrier" versus "barrier
>> > jumping
>> > > >>> over
>> > > >>>> buffer and storing those buffers" is pretty close same.
>> > > >>>> However, there are a few nice benefits of the proposal of
>> unaligned
>> > > >>>> checkpoints over Chandy-Lamport.
>> > > >>>>
>> > > >>>> *## Benefits of Unaligned Checkpoints*
>> > > >>>>
>> > > >>>> (1) It is very similar to the original algorithm (can be seen an
>> an
>> > > >>>> optional feature purely in the network stack) and thus can share
>> > > lot's of
>> > > >>>> code paths.
>> > > >>>>
>> > > >>>> (2) Less data stored. If we make the "jump over buffers" part
>> > timeout
>> > > >>> based
>> > > >>>> (for example barrier overtakes buffers if not flushed within
>> 10ms)
>> > > then
>> > > >>>> checkpoints are in the common case of flowing pipelines aligned
>> > > without
>> > > >>>> in-flight data. Only back pressured cases store some in-flight
>> data,
>> > > >>> which
>> > > >>>> means we don't regress in the common case and only fix the back
>> > > pressure
>> > > >>>> case.
>> > > >>>>
>> > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all
>> barriers
>> > to
>> > > >>>> arrive naturally, logging on the way. If data processing is slow,
>> > this
>> > > >>> can
>> > > >>>> still take quite a while.
>> > > >>>>
>> > > >>>> ==> I think both these points are strong reasons to not change
>> the
>> > > >>>> mechanism away from "trigger sources" and start with CL-style
>> > "trigger
>> > > >>> all".
>> > > >>>>
>> > > >>>>
>> > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned
>> > Checkpoints*
>> > > >>>>
>> > > >>>> We can think about something like "take state snapshot on first
>> > > barrier"
>> > > >>>> and then store buffers until the other barriers arrive. Inside
>> the
>> > > >>> network
>> > > >>>> stack, barriers could still overtake and persist buffers.
>> > > >>>> The benefit would be less latency increase in the channels which
>> > > already
>> > > >>>> have received barriers.
>> > > >>>> However, as mentioned before, not prioritizing the inputs from
>> which
>> > > >>>> barriers are still missing can also have an adverse effect.
>> > > >>>>
>> > > >>>>
>> > > >>>> *## Concerning upgrades*
>> > > >>>>
>> > > >>>> I think it is a fair restriction to say that upgrades need to
>> happen
>> > > on
>> > > >>>> aligned checkpoints. It is a rare enough operation.
>> > > >>>>
>> > > >>>>
>> > > >>>> *## Concerning re-scaling (changing parallelism)*
>> > > >>>>
>> > > >>>> We need to support that on unaligned checkpoints as well. There
>> are
>> > > >>> several
>> > > >>>> feature proposals about automatic scaling, especially down
>> scaling
>> > in
>> > > >>> case
>> > > >>>> of missing resources. The last snapshot might be a regular
>> > > checkpoint, so
>> > > >>>> all checkpoints need to support rescaling.
>> > > >>>>
>> > > >>>>
>> > > >>>> *## Concerning end-to-end checkpoint duration and "trigger
>> sources"
>> > > >>> versus
>> > > >>>> "trigger all"*
>> > > >>>>
>> > > >>>> I think for the end-to-end checkpoint duration, an "overtake
>> > buffers"
>> > > >>>> approach yields faster checkpoints, as mentioned above (Chandy
>> > Lamport
>> > > >>>> logging still needs to wait for barrier to flow).
>> > > >>>>
>> > > >>>> I don't see the benefit of a "trigger all tasks via RPC
>> > concurrently"
>> > > >>>> approach. Bear in mind that it is still a globally coordinated
>> > > approach
>> > > >>> and
>> > > >>>> you need to wait for the global checkpoint to complete before
>> > > committing
>> > > >>>> any side effects.
>> > > >>>> I believe that the checkpoint time is more determined by the
>> state
>> > > >>>> checkpoint writing, and the global coordination and metadata
>> commit,
>> > > than
>> > > >>>> by the difference in alignment time between "trigger from source
>> and
>> > > jump
>> > > >>>> over buffers" versus "trigger all tasks concurrently".
>> > > >>>>
>> > > >>>> Trying to optimize a few tens of milliseconds out of the network
>> > stack
>> > > >>>> sends (and changing the overall checkpointing approach completely
>> > for
>> > > >>> that)
>> > > >>>> while staying with a globally coordinated checkpoint will send us
>> > > down a
>> > > >>>> path to a dead end.
>> > > >>>>
>> > > >>>> To really bring task persistence latency down to 10s of
>> milliseconds
>> > > (so
>> > > >>> we
>> > > >>>> can frequently commit in sinks), we need to take an approach
>> without
>> > > any
>> > > >>>> global coordination. Tasks need to establish a persistent
>> recovery
>> > > point
>> > > >>>> individually and at their own discretion, only then can it be
>> > frequent
>> > > >>>> enough. To get there, they would need to decouple themselves from
>> > the
>> > > >>>> predecessor and successor tasks (via something like persistent
>> > > channels).
>> > > >>>> This is a different discussion, though, somewhat orthogonal to
>> this
>> > > one
>> > > >>>> here.
>> > > >>>>
>> > > >>>> Best,
>> > > >>>> Stephan
>> > > >>>>
>> > > >>>>
>> > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <
>> > pi...@ververica.com>
>> > > >>> wrote:
>> > > >>>>
>> > > >>>>> Hi again,
>> > > >>>>>
>> > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is
>> writing, we
>> > > do
>> > > >>> not
>> > > >>>>> need to block any channels at all, at least assuming credit base
>> > flow
>> > > >>>>> control. Regarding what should happen with the following
>> checkpoint
>> > > is
>> > > >>>>> another question. Also, should we support concurrent checkpoints
>> > and
>> > > >>>>> subsuming checkpoints as we do now? Maybe not…
>> > > >>>>>
>> > > >>>>> Paris
>> > > >>>>>
>> > > >>>>> Re
>> > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an
>> account
>> > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time
>> > will
>> > > >>>>> probably be longer than it could be. It might affect external
>> > > systems.
>> > > >>> For
>> > > >>>>> example Kafka, which automatically time outs lingering
>> > transactions,
>> > > and
>> > > >>>>> for us, the transaction time is equal to the time between two
>> > > >>> checkpoints.
>> > > >>>>>
>> > > >>>>> II 1. - I’m confused. To make things straight. Flink is
>> currently
>> > > >>>>> snapshotting once it receives all of the checkpoint barriers
>> from
>> > > all of
>> > > >>>>> the input channels and only then it broadcasts the checkpoint
>> > barrier
>> > > >>> down
>> > > >>>>> the stream. And this is correct from exactly-once perspective.
>> > > >>>>>
>> > > >>>>> As far as I understand, your proposal based on Chandy Lamport
>> > > algorithm,
>> > > >>>>> is snapshotting the state of the operator on the first
>> checkpoint
>> > > >>> barrier,
>> > > >>>>> which also looks correct to me.
>> > > >>>>>
>> > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about
>> > > this.
>> > > >>>>>
>> > > >>>>> V. Yes, we still need aligned checkpoints, as they are easier
>> for
>> > > state
>> > > >>>>> migration and upgrades.
>> > > >>>>>
>> > > >>>>> Piotrek
>> > > >>>>>
>> > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <
>> seniorcarb...@gmail.com>
>> > > >>> wrote:
>> > > >>>>>>
>> > > >>>>>> Now I see a little more clearly what you have in mind. Thanks
>> for
>> > > the
>> > > >>>>> explanation!
>> > > >>>>>> There are a few intermixed concepts here, some how to do with
>> > > >>>>> correctness some with performance.
>> > > >>>>>> Before delving deeper I will just enumerate a few things to
>> make
>> > > myself
>> > > >>>>> a little more helpful if I can.
>> > > >>>>>>
>> > > >>>>>> I. Initiation
>> > > >>>>>> -------------
>> > > >>>>>>
>> > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate
>> > snapshots
>> > > >>>>> since you utilize better pipeline parallelism (only a small
>> subset
>> > of
>> > > >>> tasks
>> > > >>>>> is running progressively the protocol at a time, if
>> snapshotting is
>> > > >>> async
>> > > >>>>> the overall overhead might not even be observable).
>> > > >>>>>>
>> > > >>>>>> 2. If we really want an RPC to all initiation take notice of
>> the
>> > > >>>>> following implications:
>> > > >>>>>>
>> > > >>>>>>    a. (correctness) RPC calls are not guaranteed to arrive in
>> > every
>> > > >>>>> task before a marker from a preceding task.
>> > > >>>>>>
>> > > >>>>>>    b. (correctness) Either the RPC call OR the first arriving
>> > marker
>> > > >>>>> should initiate the algorithm. Whichever comes first. If you
>> only
>> > do
>> > > it
>> > > >>> per
>> > > >>>>> RPC call then you capture a "late" state that includes side
>> effects
>> > > of
>> > > >>>>> already logged events.
>> > > >>>>>>
>> > > >>>>>>    c. (performance) Lots of IO will be invoked at the same
>> time on
>> > > >>>>> the backend store from all tasks. This might lead to high
>> > congestion
>> > > in
>> > > >>>>> async snapshots.
>> > > >>>>>>
>> > > >>>>>> II. Capturing State First
>> > > >>>>>> -------------------------
>> > > >>>>>>
>> > > >>>>>> 1. (correctness) Capturing state at the last marker sounds
>> > > incorrect to
>> > > >>>>> me (state contains side effects of already logged events based
>> on
>> > the
>> > > >>>>> proposed scheme). This results into duplicate processing. No?
>> > > >>>>>>
>> > > >>>>>> III. Channel Blocking / "Alignment"
>> > > >>>>>> -----------------------------------
>> > > >>>>>>
>> > > >>>>>> 1. (performance?) What is the added benefit? We dont want a
>> > > "complete"
>> > > >>>>> transactional snapshot, async snapshots are purely for
>> > > failure-recovery.
>> > > >>>>> Thus, I dont see why this needs to be imposed at the expense of
>> > > >>>>> performance/throughput. With the proposed scheme the whole
>> dataflow
>> > > >>> anyway
>> > > >>>>> enters snapshotting/logging mode so tasks more or less snapshot
>> > > >>>>> concurrently.
>> > > >>>>>>
>> > > >>>>>> IV Marker Bypassing
>> > > >>>>>> -------------------
>> > > >>>>>>
>> > > >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots
>> so
>> > > with
>> > > >>>>> some quick thinking  correct. I will try to model this later and
>> > get
>> > > >>> back
>> > > >>>>> to you in case I find something wrong.
>> > > >>>>>>
>> > > >>>>>> 2. (performance) It also sounds like a meaningful
>> optimisation! I
>> > > like
>> > > >>>>> thinking of this as a push-based snapshot. i.e., the producing
>> task
>> > > >>> somehow
>> > > >>>>> triggers forward a consumer/channel to capture its state. By
>> > example
>> > > >>>>> consider T1 -> |marker t1| -> T2.
>> > > >>>>>>
>> > > >>>>>> V. Usage of "Async" Snapshots
>> > > >>>>>> ---------------------
>> > > >>>>>>
>> > > >>>>>> 1. Do you see this as a full replacement of "full" aligned
>> > > >>>>> snapshots/savepoints? In my view async shanpshots will be needed
>> > from
>> > > >>> time
>> > > >>>>> to time but not as frequently. Yet, it seems like a valid
>> approach
>> > > >>> solely
>> > > >>>>> for failure-recovery on the same configuration. Here's why:
>> > > >>>>>>
>> > > >>>>>>    a. With original snapshotting there is a strong duality
>> between
>> > > >>>>>>    a stream input (offsets) and committed side effects
>> (internal
>> > > >>>>> states and external commits to transactional sinks). While in
>> the
>> > > async
>> > > >>>>> version, there are uncommitted operations (inflight records).
>> Thus,
>> > > you
>> > > >>>>> cannot use these snapshots for e.g., submitting sql queries with
>> > > >>> snapshot
>> > > >>>>> isolation. Also, the original snapshotting gives a lot of
>> potential
>> > > for
>> > > >>>>> flink to make proper transactional commits externally.
>> > > >>>>>>
>> > > >>>>>>    b. Reconfiguration is very tricky, you probably know that
>> > better.
>> > > >>>>> Inflight channel state is no longer valid in a new configuration
>> > > (i.e.,
>> > > >>> new
>> > > >>>>> dataflow graph, new operators, updated operator logic, different
>> > > >>> channels,
>> > > >>>>> different parallelism)
>> > > >>>>>>
>> > > >>>>>> 2. Async snapshots can also be potentially useful for
>> monitoring
>> > the
>> > > >>>>> general health of a dataflow since they can be analyzed by the
>> task
>> > > >>> manager
>> > > >>>>> about the general performance of a job graph and spot
>> bottlenecks
>> > for
>> > > >>>>> example.
>> > > >>>>>>
>> > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com
>> >
>> > > wrote:
>> > > >>>>>>>
>> > > >>>>>>> Hi,
>> > > >>>>>>>
>> > > >>>>>>> Thomas:
>> > > >>>>>>> There are no Jira tickets yet (or maybe there is something
>> very
>> > old
>> > > >>>>> somewhere). First we want to discuss it, next present FLIP and
>> at
>> > > last
>> > > >>>>> create tickets :)
>> > > >>>>>>>
>> > > >>>>>>>> if I understand correctly, then the proposal is to not block
>> any
>> > > >>>>>>>> input channel at all, but only log data from the
>> backpressured
>> > > >>> channel
>> > > >>>>> (and
>> > > >>>>>>>> make it part of the snapshot) until the barrier arrives
>> > > >>>>>>>
>> > > >>>>>>> I would guess that it would be better to block the reads,
>> unless
>> > we
>> > > >>> can
>> > > >>>>> already process the records from the blocked channel…
>> > > >>>>>>>
>> > > >>>>>>> Paris:
>> > > >>>>>>>
>> > > >>>>>>> Thanks for the explanation Paris. I’m starting to understand
>> this
>> > > more
>> > > >>>>> and I like the idea of snapshotting the state of an operator
>> before
>> > > >>>>> receiving all of the checkpoint barriers - this would allow more
>> > > things
>> > > >>> to
>> > > >>>>> happen at the same time instead of sequentially. As Zhijiang has
>> > > pointed
>> > > >>>>> out there are some things not considered in your proposal:
>> > overtaking
>> > > >>>>> output buffers, but maybe those things could be incorporated
>> > > together.
>> > > >>>>>>>
>> > > >>>>>>> Another thing is that from the wiki description I understood
>> that
>> > > the
>> > > >>>>> initial checkpointing is not initialised by any checkpoint
>> barrier,
>> > > but
>> > > >>> by
>> > > >>>>> an independent call/message from the Observer. I haven’t played
>> > with
>> > > >>> this
>> > > >>>>> idea a lot, but I had some discussion with Nico and it seems
>> that
>> > it
>> > > >>> might
>> > > >>>>> work:
>> > > >>>>>>>
>> > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks
>> > > >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC
>> > from
>> > > >>> 1.,
>> > > >>>>> takes a snapshot of it's state and:
>> > > >>>>>>> a) broadcast checkpoint barrier down the stream to all
>> channels
>> > > (let’s
>> > > >>>>> ignore for a moment potential for this barrier to overtake the
>> > buffer
>> > > >>>>> output data)
>> > > >>>>>>> b) for any input channel for which it hasn’t yet received
>> > > checkpoint
>> > > >>>>> barrier, the data are being added to the checkpoint
>> > > >>>>>>> c) once a channel (for example l1) receives a checkpoint
>> barrier,
>> > > the
>> > > >>>>> Task blocks reads from that channel (?)
>> > > >>>>>>> d) after all remaining channels (l2) receive checkpoint
>> barriers,
>> > > the
>> > > >>>>> Task  first has to process the buffered data after that it can
>> > > unblock
>> > > >>> the
>> > > >>>>> reads from the channels
>> > > >>>>>>>
>> > > >>>>>>> Checkpoint barriers do not cascade/flow through different
>> tasks
>> > > here.
>> > > >>>>> Checkpoint barrier emitted from Task1, reaches only the
>> immediate
>> > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing
>> time is
>> > > not
>> > > >>> sum
>> > > >>>>> of checkpointing times of all Tasks one by one, but more or less
>> > max
>> > > of
>> > > >>> the
>> > > >>>>> slowest Tasks. Right?
>> > > >>>>>>>
>> > > >>>>>>> Couple of intriguing thoughts are:
>> > > >>>>>>> 3. checkpoint barriers overtaking the output buffers
>> > > >>>>>>> 4. can we keep processing some data (in order to not waste CPU
>> > > cycles)
>> > > >>>>> after we have taking the snapshot of the Task. I think we could.
>> > > >>>>>>>
>> > > >>>>>>> Piotrek
>> > > >>>>>>>
>> > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org>
>> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>> Great discussion! I'm excited that this is already under
>> > > >>>>> consideration! Are
>> > > >>>>>>>> there any JIRAs or other traces of discussion to follow?
>> > > >>>>>>>>
>> > > >>>>>>>> Paris, if I understand correctly, then the proposal is to not
>> > > block
>> > > >>> any
>> > > >>>>>>>> input channel at all, but only log data from the
>> backpressured
>> > > >>> channel
>> > > >>>>> (and
>> > > >>>>>>>> make it part of the snapshot) until the barrier arrives?
>> This is
>> > > >>>>>>>> intriguing. But probably there is also a benefit of to not
>> > > continue
>> > > >>>>> reading
>> > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the
>> > user
>> > > >>> code
>> > > >>>>> is
>> > > >>>>>>>> the cause of backpressure, this would avoid pumping more data
>> > into
>> > > >>> the
>> > > >>>>>>>> process function.
>> > > >>>>>>>>
>> > > >>>>>>>> Thanks,
>> > > >>>>>>>> Thomas
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <
>> > > wangzhijiang...@aliyun.com
>> > > >>>>> .invalid>
>> > > >>>>>>>> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>>> Hi Paris,
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very
>> similar
>> > > with
>> > > >>>>> the
>> > > >>>>>>>>> way of overtaking we proposed before.
>> > > >>>>>>>>>
>> > > >>>>>>>>> There are some tiny difference:
>> > > >>>>>>>>> The way of overtaking might need to snapshot all the
>> > input/output
>> > > >>>>> queues.
>> > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input
>> channels
>> > > >>> after
>> > > >>>>> the
>> > > >>>>>>>>> first barrier arrives, which might reduce the state sizea
>> bit.
>> > > But
>> > > >>>>> normally
>> > > >>>>>>>>> there should be less buffers for the first input channel
>> with
>> > > >>> barrier.
>> > > >>>>>>>>> The output barrier still follows with regular data stream in
>> > > Chandy
>> > > >>>>>>>>> Lamport, the same way as current flink. For overtaking way,
>> we
>> > > need
>> > > >>>>> to pay
>> > > >>>>>>>>> extra efforts to make barrier transport firstly before
>> outque
>> > > queue
>> > > >>> on
>> > > >>>>>>>>> upstream side, and change the way of barrier alignment
>> based on
>> > > >>>>> receiving
>> > > >>>>>>>>> instead of current reading on downstream side.
>> > > >>>>>>>>> In the backpressure caused by data skew, the first barrier
>> in
>> > > almost
>> > > >>>>> empty
>> > > >>>>>>>>> input channel should arrive much eariler than the last heavy
>> > load
>> > > >>>>> input
>> > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for
>> the
>> > > case
>> > > >>>>> of all
>> > > >>>>>>>>> balanced heavy load input channels, I mean the first arrived
>> > > barrier
>> > > >>>>> might
>> > > >>>>>>>>> still take much time, then the overtaking way could still
>> fit
>> > > well
>> > > >>> to
>> > > >>>>> speed
>> > > >>>>>>>>> up checkpoint.
>> > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side,
>> > > especially
>> > > >>>>>>>>> considering some implementation details .
>> > > >>>>>>>>>
>> > > >>>>>>>>> Best,
>> > > >>>>>>>>> Zhijiang
>> > > >>>>>>>>>
>> > > ------------------------------------------------------------------
>> > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
>> > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03
>> > > >>>>>>>>> To:dev <dev@flink.apache.org>
>> > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
>> > > >>>>>>>>> Subject:Re: Checkpointing under backpressure
>> > > >>>>>>>>>
>> > > >>>>>>>>> yes! It’s quite similar I think.  Though mind that the
>> devil is
>> > > in
>> > > >>> the
>> > > >>>>>>>>> details, i.e., the temporal order actions are taken.
>> > > >>>>>>>>>
>> > > >>>>>>>>> To clarify, let us say you have a task T with two input
>> > channels
>> > > I1
>> > > >>>>> and I2.
>> > > >>>>>>>>> The Chandy Lamport execution flow is the following:
>> > > >>>>>>>>>
>> > > >>>>>>>>> 1) T receives barrier from  I1 and...
>> > > >>>>>>>>> 2)  ...the following three actions happen atomically
>> > > >>>>>>>>> I )  T snapshots its state T*
>> > > >>>>>>>>> II)  T forwards marker to its outputs
>> > > >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer
>> M*
>> > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does in
>> > aligned
>> > > >>>>>>>>> snapshots -
>> > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording
>> > > events.
>> > > >>>>> Its
>> > > >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}.
>> > > >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO
>> > > order.
>> > > >>>>>>>>>
>> > > >>>>>>>>> With this approach alignment does not create a deadlock
>> > situation
>> > > >>>>> since
>> > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be
>> logged
>> > as
>> > > >>> well
>> > > >>>>>>>>> asynchronously during the process of the snapshot. If there
>> is
>> > > >>>>>>>>> back-pressure in a pipeline the cause is most probably not
>> this
>> > > >>>>> algorithm.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Back to your observation, the answer : yes and no.  In your
>> > > network
>> > > >>>>> model,
>> > > >>>>>>>>> I can see the logic of “logging” and “committing” a final
>> > > snapshot
>> > > >>>>> being
>> > > >>>>>>>>> provided by the channel implementation. However, do mind
>> that
>> > the
>> > > >>>>> first
>> > > >>>>>>>>> barrier always needs to go “all the way” to initiate the
>> Chandy
>> > > >>>>> Lamport
>> > > >>>>>>>>> algorithm logic.
>> > > >>>>>>>>>
>> > > >>>>>>>>> The above flow has been proven using temporal logic in my
>> phd
>> > > thesis
>> > > >>>>> in
>> > > >>>>>>>>> case you are interested about the proof.
>> > > >>>>>>>>> I hope this helps a little clarifying things. Let me know if
>> > > there
>> > > >>> is
>> > > >>>>> any
>> > > >>>>>>>>> confusing point to disambiguate. I would be more than happy
>> to
>> > > help
>> > > >>>>> if I
>> > > >>>>>>>>> can.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Paris
>> > > >>>>>>>>>
>> > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <
>> pi...@ververica.com
>> > >
>> > > >>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport
>> snapshots
>> > > don’t
>> > > >>>>> you
>> > > >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in
>> > > order
>> > > >>> to
>> > > >>>>> know
>> > > >>>>>>>>> when have you already received all possible messages from
>> the
>> > > >>> upstream
>> > > >>>>>>>>> tasks/operators? So instead of processing the “in flight”
>> > > messages
>> > > >>>>> (as the
>> > > >>>>>>>>> Flink is doing currently), you are sending them to an
>> > “observer”?
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers
>> > > >>>>> overtaking
>> > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us,
>> > the
>> > > >>>>> observer
>> > > >>>>>>>>> is a snapshot state.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Piotrek
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <
>> > > seniorcarb...@gmail.com>
>> > > >>>>>>>>> wrote:
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe
>> Chandy-Lamport
>> > > >>>>> snapshots
>> > > >>>>>>>>> [1] would help out solve this problem more elegantly without
>> > > >>>>> sacrificing
>> > > >>>>>>>>> correctness.
>> > > >>>>>>>>>>> - They do not need alignment, only (async) logging for
>> > > in-flight
>> > > >>>>>>>>> records between the time the first barrier is processed
>> until
>> > the
>> > > >>> last
>> > > >>>>>>>>> barrier arrives in a task.
>> > > >>>>>>>>>>> - They work fine for failure recovery as long as logged
>> > records
>> > > >>> are
>> > > >>>>>>>>> replayed on startup.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still
>> > necessary
>> > > >>> for
>> > > >>>>>>>>> transactional sink commits + any sort of reconfiguration
>> (e.g.,
>> > > >>>>> rescaling,
>> > > >>>>>>>>> updating the logic of operators to evolve an application
>> etc.).
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> I don’t completely understand the “overtaking” approach
>> but
>> > if
>> > > you
>> > > >>>>> have
>> > > >>>>>>>>> a concrete definition I would be happy to check it out and
>> help
>> > > if I
>> > > >>>>> can!
>> > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging
>> > > things
>> > > >>> in
>> > > >>>>>>>>> pending channels in a task snapshot before the barrier
>> arrives.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> -Paris
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> [1]
>> > > >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
>> > > >>>>> <
>> > > >>>>>>>>>
>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
>> > >
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <
>> > pi...@ververica.com
>> > > >
>> > > >>>>> wrote:
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Hi Thomas,
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of
>> > > >>> discussing
>> > > >>>>> how
>> > > >>>>>>>>> to address this issue and one of the solution that we are
>> > > discussing
>> > > >>>>> is
>> > > >>>>>>>>> exactly what you are proposing: checkpoint barriers
>> overtaking
>> > > the
>> > > >>> in
>> > > >>>>>>>>> flight data and make the in flight data part of the
>> checkpoint.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> If everything works well, we will be able to present
>> result
>> > of
>> > > >>> our
>> > > >>>>>>>>> discussions on the dev mailing list soon.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Piotrek
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <
>> > > wangzhijiang...@aliyun.com
>> > > >>>>> .INVALID>
>> > > >>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hi Thomas,
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment
>> > > takes
>> > > >>>>> long
>> > > >>>>>>>>> time in backpressure case which could cause several
>> problems:
>> > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned.
>> > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much
>> > data
>> > > >>>>> needs
>> > > >>>>>>>>> to be replayed.
>> > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in
>> exactly-once.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the
>> amount
>> > of
>> > > >>>>>>>>> in-flighting buffers before barrier alignment is reduced,
>> so we
>> > > >>> could
>> > > >>>>> get a
>> > > >>>>>>>>> bit
>> > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels
>> > which
>> > > >>>>> already
>> > > >>>>>>>>> received the barrier in practice. But actually we ever did
>> the
>> > > >>>>> similar thing
>> > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure
>> that
>> > > >>>>>>>>> release-1.8 covers this feature. There were some relevant
>> > > >>> discussions
>> > > >>>>> under
>> > > >>>>>>>>> jira [1].
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> For release-1.10, the community is now discussing the
>> > > feature of
>> > > >>>>>>>>> unaligned checkpoint which is mainly for resolving above
>> > > concerns.
>> > > >>> The
>> > > >>>>>>>>> basic idea
>> > > >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer
>> queue
>> > to
>> > > >>>>> speed
>> > > >>>>>>>>> alignment, and snapshot the input/output buffers as part of
>> > > >>> checkpoint
>> > > >>>>>>>>> state. The
>> > > >>>>>>>>>>>>> details have not confirmed yet and is still under
>> > discussion.
>> > > >>>>> Wish we
>> > > >>>>>>>>> could make some improvments for the release-1.10.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Best,
>> > > >>>>>>>>>>>>> Zhijiang
>> > > >>>>>>>>>>>>>
>> > > >>> ------------------------------------------------------------------
>> > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org>
>> > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
>> > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org>
>> > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hi,
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> One of the major operational difficulties we observe
>> with
>> > > Flink
>> > > >>>>> are
>> > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for
>> > both
>> > > >>>>>>>>> confirmation
>> > > >>>>>>>>>>>>> of my understanding of the current behavior as well as
>> > > pointers
>> > > >>>>> for
>> > > >>>>>>>>> future
>> > > >>>>>>>>>>>>> improvement work:
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Prior to introduction of credit based flow control in
>> the
>> > > >>> network
>> > > >>>>>>>>> stack [1]
>> > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for
>> > all
>> > > >>>>> logical
>> > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the
>> > > buffers
>> > > >>> are
>> > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only
>> > held
>> > > >>>>> back for
>> > > >>>>>>>>>>>>> channels that have backpressure, while others can
>> continue
>> > > >>>>> processing
>> > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot
>> > "overtake
>> > > >>>>> data",
>> > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the
>> > > channel
>> > > >>>>> with
>> > > >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing
>> and
>> > > >>>>> timeouts.
>> > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the
>> maximum
>> > > >>>>> in-transit
>> > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement
>> compared
>> > to
>> > > >>>>> previous
>> > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based
>> checkpoint
>> > > >>>>> alignment
>> > > >>>>>>>>> can
>> > > >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by
>> > > >>>>> suspending
>> > > >>>>>>>>>>>>> channels that have already delivered the barrier). Is
>> that
>> > > >>>>> accurate
>> > > >>>>>>>>> as of
>> > > >>>>>>>>>>>>> Flink 1.8?
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> What appears to be missing to completely unblock
>> > > checkpointing
>> > > >>> is
>> > > >>>>> a
>> > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That
>> would
>> > > help
>> > > >>> in
>> > > >>>>>>>>>>>>> situations where the processing itself is the bottleneck
>> > and
>> > > >>>>>>>>> prioritization
>> > > >>>>>>>>>>>>> in the network stack alone cannot address the barrier
>> > delay.
>> > > Was
>> > > >>>>>>>>> there any
>> > > >>>>>>>>>>>>> related discussion? One possible solution would be to
>> drain
>> > > >>>>> incoming
>> > > >>>>>>>>> data
>> > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint
>> instead
>> > > of
>> > > >>>>>>>>> processing
>> > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing,
>> > but
>> > > I'm
>> > > >>>>>>>>> thinking
>> > > >>>>>>>>>>>>> more of a solution that is automated in the Flink
>> runtime
>> > for
>> > > >>> the
>> > > >>>>>>>>>>>>> backpressure scenario only.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>>>> Thomas
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> [1]
>> > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html
>> > > >>>>>>>>>>>>> [2]
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>
>> > > >>>
>> > >
>> >
>> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>
>> > > >>>
>> > > >
>> > >
>> > >
>> >
>> >
>>
>>
>>

Reply via email to