Scaling with unaligned checkpoints might be a necessity. Let's assume the job failed due to a lost TaskManager, but no new TaskManager becomes available. In that case we need to scale down based on the latest complete checkpoint, because we cannot produce a new checkpoint.
On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <seniorcarb...@gmail.com> wrote: > +1 I think we are on the same page Stephan. > > Rescaling on unaligned checkpoint sounds challenging and a bit > unnecessary. No? > Why not sticking to aligned snapshots for live reconfiguration/rescaling? > It’s a pretty rare operation and it would simplify things by a lot. > Everything can be “staged” upon alignment including replacing channels and > tasks. > > -Paris > > > On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote: > > > > Hi all! > > > > Yes, the first proposal of "unaligend checkpoints" (probably two years > back > > now) drew a major inspiration from Chandy Lamport, as did actually the > > original checkpointing algorithm. > > > > "Logging data between first and last barrier" versus "barrier jumping > over > > buffer and storing those buffers" is pretty close same. > > However, there are a few nice benefits of the proposal of unaligned > > checkpoints over Chandy-Lamport. > > > > *## Benefits of Unaligned Checkpoints* > > > > (1) It is very similar to the original algorithm (can be seen an an > > optional feature purely in the network stack) and thus can share lot's of > > code paths. > > > > (2) Less data stored. If we make the "jump over buffers" part timeout > based > > (for example barrier overtakes buffers if not flushed within 10ms) then > > checkpoints are in the common case of flowing pipelines aligned without > > in-flight data. Only back pressured cases store some in-flight data, > which > > means we don't regress in the common case and only fix the back pressure > > case. > > > > (3) Faster checkpoints. Chandy Lamport still waits for all barriers to > > arrive naturally, logging on the way. If data processing is slow, this > can > > still take quite a while. > > > > ==> I think both these points are strong reasons to not change the > > mechanism away from "trigger sources" and start with CL-style "trigger > all". > > > > > > *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints* > > > > We can think about something like "take state snapshot on first barrier" > > and then store buffers until the other barriers arrive. Inside the > network > > stack, barriers could still overtake and persist buffers. > > The benefit would be less latency increase in the channels which already > > have received barriers. > > However, as mentioned before, not prioritizing the inputs from which > > barriers are still missing can also have an adverse effect. > > > > > > *## Concerning upgrades* > > > > I think it is a fair restriction to say that upgrades need to happen on > > aligned checkpoints. It is a rare enough operation. > > > > > > *## Concerning re-scaling (changing parallelism)* > > > > We need to support that on unaligned checkpoints as well. There are > several > > feature proposals about automatic scaling, especially down scaling in > case > > of missing resources. The last snapshot might be a regular checkpoint, so > > all checkpoints need to support rescaling. > > > > > > *## Concerning end-to-end checkpoint duration and "trigger sources" > versus > > "trigger all"* > > > > I think for the end-to-end checkpoint duration, an "overtake buffers" > > approach yields faster checkpoints, as mentioned above (Chandy Lamport > > logging still needs to wait for barrier to flow). > > > > I don't see the benefit of a "trigger all tasks via RPC concurrently" > > approach. Bear in mind that it is still a globally coordinated approach > and > > you need to wait for the global checkpoint to complete before committing > > any side effects. > > I believe that the checkpoint time is more determined by the state > > checkpoint writing, and the global coordination and metadata commit, than > > by the difference in alignment time between "trigger from source and jump > > over buffers" versus "trigger all tasks concurrently". > > > > Trying to optimize a few tens of milliseconds out of the network stack > > sends (and changing the overall checkpointing approach completely for > that) > > while staying with a globally coordinated checkpoint will send us down a > > path to a dead end. > > > > To really bring task persistence latency down to 10s of milliseconds (so > we > > can frequently commit in sinks), we need to take an approach without any > > global coordination. Tasks need to establish a persistent recovery point > > individually and at their own discretion, only then can it be frequent > > enough. To get there, they would need to decouple themselves from the > > predecessor and successor tasks (via something like persistent channels). > > This is a different discussion, though, somewhat orthogonal to this one > > here. > > > > Best, > > Stephan > > > > > > On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com> > wrote: > > > >> Hi again, > >> > >> Zhu Zhu let me think about this more. Maybe as Paris is writing, we do > not > >> need to block any channels at all, at least assuming credit base flow > >> control. Regarding what should happen with the following checkpoint is > >> another question. Also, should we support concurrent checkpoints and > >> subsuming checkpoints as we do now? Maybe not… > >> > >> Paris > >> > >> Re > >> I. 2. a) and b) - yes, this would have to be taken into an account > >> I. 2. c) and IV. 2. - without those, end to end checkpoint time will > >> probably be longer than it could be. It might affect external systems. > For > >> example Kafka, which automatically time outs lingering transactions, and > >> for us, the transaction time is equal to the time between two > checkpoints. > >> > >> II 1. - I’m confused. To make things straight. Flink is currently > >> snapshotting once it receives all of the checkpoint barriers from all of > >> the input channels and only then it broadcasts the checkpoint barrier > down > >> the stream. And this is correct from exactly-once perspective. > >> > >> As far as I understand, your proposal based on Chandy Lamport algorithm, > >> is snapshotting the state of the operator on the first checkpoint > barrier, > >> which also looks correct to me. > >> > >> III. 1. As I responded to Zhu Zhu, let me think a bit more about this. > >> > >> V. Yes, we still need aligned checkpoints, as they are easier for state > >> migration and upgrades. > >> > >> Piotrek > >> > >>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com> > wrote: > >>> > >>> Now I see a little more clearly what you have in mind. Thanks for the > >> explanation! > >>> There are a few intermixed concepts here, some how to do with > >> correctness some with performance. > >>> Before delving deeper I will just enumerate a few things to make myself > >> a little more helpful if I can. > >>> > >>> I. Initiation > >>> ------------- > >>> > >>> 1. RPC to sources only is a less intrusive way to initiate snapshots > >> since you utilize better pipeline parallelism (only a small subset of > tasks > >> is running progressively the protocol at a time, if snapshotting is > async > >> the overall overhead might not even be observable). > >>> > >>> 2. If we really want an RPC to all initiation take notice of the > >> following implications: > >>> > >>> a. (correctness) RPC calls are not guaranteed to arrive in every > >> task before a marker from a preceding task. > >>> > >>> b. (correctness) Either the RPC call OR the first arriving marker > >> should initiate the algorithm. Whichever comes first. If you only do it > per > >> RPC call then you capture a "late" state that includes side effects of > >> already logged events. > >>> > >>> c. (performance) Lots of IO will be invoked at the same time on > >> the backend store from all tasks. This might lead to high congestion in > >> async snapshots. > >>> > >>> II. Capturing State First > >>> ------------------------- > >>> > >>> 1. (correctness) Capturing state at the last marker sounds incorrect to > >> me (state contains side effects of already logged events based on the > >> proposed scheme). This results into duplicate processing. No? > >>> > >>> III. Channel Blocking / "Alignment" > >>> ----------------------------------- > >>> > >>> 1. (performance?) What is the added benefit? We dont want a "complete" > >> transactional snapshot, async snapshots are purely for failure-recovery. > >> Thus, I dont see why this needs to be imposed at the expense of > >> performance/throughput. With the proposed scheme the whole dataflow > anyway > >> enters snapshotting/logging mode so tasks more or less snapshot > >> concurrently. > >>> > >>> IV Marker Bypassing > >>> ------------------- > >>> > >>> 1. (correctness) This leads to equivalent in-flight snapshots so with > >> some quick thinking correct. I will try to model this later and get > back > >> to you in case I find something wrong. > >>> > >>> 2. (performance) It also sounds like a meaningful optimisation! I like > >> thinking of this as a push-based snapshot. i.e., the producing task > somehow > >> triggers forward a consumer/channel to capture its state. By example > >> consider T1 -> |marker t1| -> T2. > >>> > >>> V. Usage of "Async" Snapshots > >>> --------------------- > >>> > >>> 1. Do you see this as a full replacement of "full" aligned > >> snapshots/savepoints? In my view async shanpshots will be needed from > time > >> to time but not as frequently. Yet, it seems like a valid approach > solely > >> for failure-recovery on the same configuration. Here's why: > >>> > >>> a. With original snapshotting there is a strong duality between > >>> a stream input (offsets) and committed side effects (internal > >> states and external commits to transactional sinks). While in the async > >> version, there are uncommitted operations (inflight records). Thus, you > >> cannot use these snapshots for e.g., submitting sql queries with > snapshot > >> isolation. Also, the original snapshotting gives a lot of potential for > >> flink to make proper transactional commits externally. > >>> > >>> b. Reconfiguration is very tricky, you probably know that better. > >> Inflight channel state is no longer valid in a new configuration (i.e., > new > >> dataflow graph, new operators, updated operator logic, different > channels, > >> different parallelism) > >>> > >>> 2. Async snapshots can also be potentially useful for monitoring the > >> general health of a dataflow since they can be analyzed by the task > manager > >> about the general performance of a job graph and spot bottlenecks for > >> example. > >>> > >>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com> wrote: > >>>> > >>>> Hi, > >>>> > >>>> Thomas: > >>>> There are no Jira tickets yet (or maybe there is something very old > >> somewhere). First we want to discuss it, next present FLIP and at last > >> create tickets :) > >>>> > >>>>> if I understand correctly, then the proposal is to not block any > >>>>> input channel at all, but only log data from the backpressured > channel > >> (and > >>>>> make it part of the snapshot) until the barrier arrives > >>>> > >>>> I would guess that it would be better to block the reads, unless we > can > >> already process the records from the blocked channel… > >>>> > >>>> Paris: > >>>> > >>>> Thanks for the explanation Paris. I’m starting to understand this more > >> and I like the idea of snapshotting the state of an operator before > >> receiving all of the checkpoint barriers - this would allow more things > to > >> happen at the same time instead of sequentially. As Zhijiang has pointed > >> out there are some things not considered in your proposal: overtaking > >> output buffers, but maybe those things could be incorporated together. > >>>> > >>>> Another thing is that from the wiki description I understood that the > >> initial checkpointing is not initialised by any checkpoint barrier, but > by > >> an independent call/message from the Observer. I haven’t played with > this > >> idea a lot, but I had some discussion with Nico and it seems that it > might > >> work: > >>>> > >>>> 1. JobManager sends and RPC “start checkpoint” to all tasks > >>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from > 1., > >> takes a snapshot of it's state and: > >>>> a) broadcast checkpoint barrier down the stream to all channels (let’s > >> ignore for a moment potential for this barrier to overtake the buffer > >> output data) > >>>> b) for any input channel for which it hasn’t yet received checkpoint > >> barrier, the data are being added to the checkpoint > >>>> c) once a channel (for example l1) receives a checkpoint barrier, the > >> Task blocks reads from that channel (?) > >>>> d) after all remaining channels (l2) receive checkpoint barriers, the > >> Task first has to process the buffered data after that it can unblock > the > >> reads from the channels > >>>> > >>>> Checkpoint barriers do not cascade/flow through different tasks here. > >> Checkpoint barrier emitted from Task1, reaches only the immediate > >> downstream Tasks. Thanks to this setup, total checkpointing time is not > sum > >> of checkpointing times of all Tasks one by one, but more or less max of > the > >> slowest Tasks. Right? > >>>> > >>>> Couple of intriguing thoughts are: > >>>> 3. checkpoint barriers overtaking the output buffers > >>>> 4. can we keep processing some data (in order to not waste CPU cycles) > >> after we have taking the snapshot of the Task. I think we could. > >>>> > >>>> Piotrek > >>>> > >>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote: > >>>>> > >>>>> Great discussion! I'm excited that this is already under > >> consideration! Are > >>>>> there any JIRAs or other traces of discussion to follow? > >>>>> > >>>>> Paris, if I understand correctly, then the proposal is to not block > any > >>>>> input channel at all, but only log data from the backpressured > channel > >> (and > >>>>> make it part of the snapshot) until the barrier arrives? This is > >>>>> intriguing. But probably there is also a benefit of to not continue > >> reading > >>>>> I1 since that could speed up retrieval from I2. Also, if the user > code > >> is > >>>>> the cause of backpressure, this would avoid pumping more data into > the > >>>>> process function. > >>>>> > >>>>> Thanks, > >>>>> Thomas > >>>>> > >>>>> > >>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <wangzhijiang...@aliyun.com > >> .invalid> > >>>>> wrote: > >>>>> > >>>>>> Hi Paris, > >>>>>> > >>>>>> Thanks for the detailed sharing. And I think it is very similar with > >> the > >>>>>> way of overtaking we proposed before. > >>>>>> > >>>>>> There are some tiny difference: > >>>>>> The way of overtaking might need to snapshot all the input/output > >> queues. > >>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels > after > >> the > >>>>>> first barrier arrives, which might reduce the state sizea bit. But > >> normally > >>>>>> there should be less buffers for the first input channel with > barrier. > >>>>>> The output barrier still follows with regular data stream in Chandy > >>>>>> Lamport, the same way as current flink. For overtaking way, we need > >> to pay > >>>>>> extra efforts to make barrier transport firstly before outque queue > on > >>>>>> upstream side, and change the way of barrier alignment based on > >> receiving > >>>>>> instead of current reading on downstream side. > >>>>>> In the backpressure caused by data skew, the first barrier in almost > >> empty > >>>>>> input channel should arrive much eariler than the last heavy load > >> input > >>>>>> channel, so the Chandy Lamport could benefit well. But for the case > >> of all > >>>>>> balanced heavy load input channels, I mean the first arrived barrier > >> might > >>>>>> still take much time, then the overtaking way could still fit well > to > >> speed > >>>>>> up checkpoint. > >>>>>> Anyway, your proposed suggestion is helpful on my side, especially > >>>>>> considering some implementation details . > >>>>>> > >>>>>> Best, > >>>>>> Zhijiang > >>>>>> ------------------------------------------------------------------ > >>>>>> From:Paris Carbone <seniorcarb...@gmail.com> > >>>>>> Send Time:2019年8月13日(星期二) 14:03 > >>>>>> To:dev <dev@flink.apache.org> > >>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> > >>>>>> Subject:Re: Checkpointing under backpressure > >>>>>> > >>>>>> yes! It’s quite similar I think. Though mind that the devil is in > the > >>>>>> details, i.e., the temporal order actions are taken. > >>>>>> > >>>>>> To clarify, let us say you have a task T with two input channels I1 > >> and I2. > >>>>>> The Chandy Lamport execution flow is the following: > >>>>>> > >>>>>> 1) T receives barrier from I1 and... > >>>>>> 2) ...the following three actions happen atomically > >>>>>> I ) T snapshots its state T* > >>>>>> II) T forwards marker to its outputs > >>>>>> III) T starts logging all events of I2 (only) into a buffer M* > >>>>>> - Also notice here that T does NOT block I1 as it does in aligned > >>>>>> snapshots - > >>>>>> 3) Eventually T receives barrier from I2 and stops recording events. > >> Its > >>>>>> asynchronously captured snapshot is now complete: {T*,M*}. > >>>>>> Upon recovery all messages of M* should be replayed in FIFO order. > >>>>>> > >>>>>> With this approach alignment does not create a deadlock situation > >> since > >>>>>> anyway 2.II happens asynchronously and messages can be logged as > well > >>>>>> asynchronously during the process of the snapshot. If there is > >>>>>> back-pressure in a pipeline the cause is most probably not this > >> algorithm. > >>>>>> > >>>>>> Back to your observation, the answer : yes and no. In your network > >> model, > >>>>>> I can see the logic of “logging” and “committing” a final snapshot > >> being > >>>>>> provided by the channel implementation. However, do mind that the > >> first > >>>>>> barrier always needs to go “all the way” to initiate the Chandy > >> Lamport > >>>>>> algorithm logic. > >>>>>> > >>>>>> The above flow has been proven using temporal logic in my phd thesis > >> in > >>>>>> case you are interested about the proof. > >>>>>> I hope this helps a little clarifying things. Let me know if there > is > >> any > >>>>>> confusing point to disambiguate. I would be more than happy to help > >> if I > >>>>>> can. > >>>>>> > >>>>>> Paris > >>>>>> > >>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com> > >> wrote: > >>>>>>> > >>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t > >> you > >>>>>> still have to wait for the “checkpoint barrier” to arrive in order > to > >> know > >>>>>> when have you already received all possible messages from the > upstream > >>>>>> tasks/operators? So instead of processing the “in flight” messages > >> (as the > >>>>>> Flink is doing currently), you are sending them to an “observer”? > >>>>>>> > >>>>>>> In that case, that’s sounds similar to “checkpoint barriers > >> overtaking > >>>>>> in flight records” (aka unaligned checkpoints). Just for us, the > >> observer > >>>>>> is a snapshot state. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <seniorcarb...@gmail.com> > >>>>>> wrote: > >>>>>>>> > >>>>>>>> Interesting problem! Thanks for bringing it up Thomas. > >>>>>>>> > >>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport > >> snapshots > >>>>>> [1] would help out solve this problem more elegantly without > >> sacrificing > >>>>>> correctness. > >>>>>>>> - They do not need alignment, only (async) logging for in-flight > >>>>>> records between the time the first barrier is processed until the > last > >>>>>> barrier arrives in a task. > >>>>>>>> - They work fine for failure recovery as long as logged records > are > >>>>>> replayed on startup. > >>>>>>>> > >>>>>>>> Flink’s “alligned” savepoints would probably be still necessary > for > >>>>>> transactional sink commits + any sort of reconfiguration (e.g., > >> rescaling, > >>>>>> updating the logic of operators to evolve an application etc.). > >>>>>>>> > >>>>>>>> I don’t completely understand the “overtaking” approach but if you > >> have > >>>>>> a concrete definition I would be happy to check it out and help if I > >> can! > >>>>>>>> Mind that Chandy-Lamport essentially does this by logging things > in > >>>>>> pending channels in a task snapshot before the barrier arrives. > >>>>>>>> > >>>>>>>> -Paris > >>>>>>>> > >>>>>>>> [1] > https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > >> < > >>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm> > >>>>>>>> > >>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> > >> wrote: > >>>>>>>>> > >>>>>>>>> Hi Thomas, > >>>>>>>>> > >>>>>>>>> As Zhijiang has responded, we are now in the process of > discussing > >> how > >>>>>> to address this issue and one of the solution that we are discussing > >> is > >>>>>> exactly what you are proposing: checkpoint barriers overtaking the > in > >>>>>> flight data and make the in flight data part of the checkpoint. > >>>>>>>>> > >>>>>>>>> If everything works well, we will be able to present result of > our > >>>>>> discussions on the dev mailing list soon. > >>>>>>>>> > >>>>>>>>> Piotrek > >>>>>>>>> > >>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com > >> .INVALID> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Thomas, > >>>>>>>>>> > >>>>>>>>>> Thanks for proposing this concern. The barrier alignment takes > >> long > >>>>>> time in backpressure case which could cause several problems: > >>>>>>>>>> 1. Checkpoint timeout as you mentioned. > >>>>>>>>>> 2. The recovery cost is high once failover, because much data > >> needs > >>>>>> to be replayed. > >>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once. > >>>>>>>>>> > >>>>>>>>>> For credit-based flow control from release-1.5, the amount of > >>>>>> in-flighting buffers before barrier alignment is reduced, so we > could > >> get a > >>>>>> bit > >>>>>>>>>> benefits from speeding checkpoint aspect. > >>>>>>>>>> > >>>>>>>>>> In release-1.8, I guess we did not suspend the channels which > >> already > >>>>>> received the barrier in practice. But actually we ever did the > >> similar thing > >>>>>>>>>> to speed barrier alighment before. I am not quite sure that > >>>>>> release-1.8 covers this feature. There were some relevant > discussions > >> under > >>>>>> jira [1]. > >>>>>>>>>> > >>>>>>>>>> For release-1.10, the community is now discussing the feature of > >>>>>> unaligned checkpoint which is mainly for resolving above concerns. > The > >>>>>> basic idea > >>>>>>>>>> is to make barrier overtakes the output/input buffer queue to > >> speed > >>>>>> alignment, and snapshot the input/output buffers as part of > checkpoint > >>>>>> state. The > >>>>>>>>>> details have not confirmed yet and is still under discussion. > >> Wish we > >>>>>> could make some improvments for the release-1.10. > >>>>>>>>>> > >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523 > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Zhijiang > >>>>>>>>>> > ------------------------------------------------------------------ > >>>>>>>>>> From:Thomas Weise <t...@apache.org> > >>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 > >>>>>>>>>> To:dev <dev@flink.apache.org> > >>>>>>>>>> Subject:Checkpointing under backpressure > >>>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> One of the major operational difficulties we observe with Flink > >> are > >>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both > >>>>>> confirmation > >>>>>>>>>> of my understanding of the current behavior as well as pointers > >> for > >>>>>> future > >>>>>>>>>> improvement work: > >>>>>>>>>> > >>>>>>>>>> Prior to introduction of credit based flow control in the > network > >>>>>> stack [1] > >>>>>>>>>> [2], checkpoint barriers would back up with the data for all > >> logical > >>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the buffers > are > >>>>>>>>>> controlled per channel, and checkpoint barriers are only held > >> back for > >>>>>>>>>> channels that have backpressure, while others can continue > >> processing > >>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake > >> data", > >>>>>>>>>> therefore checkpoint alignment remains affected for the channel > >> with > >>>>>>>>>> backpressure, with the potential for slow checkpointing and > >> timeouts. > >>>>>>>>>> Albeit the delay of barriers would be capped by the maximum > >> in-transit > >>>>>>>>>> buffers per channel, resulting in an improvement compared to > >> previous > >>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint > >> alignment > >>>>>> can > >>>>>>>>>> help the barrier advance faster on the receiver side (by > >> suspending > >>>>>>>>>> channels that have already delivered the barrier). Is that > >> accurate > >>>>>> as of > >>>>>>>>>> Flink 1.8? > >>>>>>>>>> > >>>>>>>>>> What appears to be missing to completely unblock checkpointing > is > >> a > >>>>>>>>>> mechanism for checkpoints to overtake the data. That would help > in > >>>>>>>>>> situations where the processing itself is the bottleneck and > >>>>>> prioritization > >>>>>>>>>> in the network stack alone cannot address the barrier delay. Was > >>>>>> there any > >>>>>>>>>> related discussion? One possible solution would be to drain > >> incoming > >>>>>> data > >>>>>>>>>> till the barrier and make it part of the checkpoint instead of > >>>>>> processing > >>>>>>>>>> it. This is somewhat related to asynchronous processing, but I'm > >>>>>> thinking > >>>>>>>>>> more of a solution that is automated in the Flink runtime for > the > >>>>>>>>>> backpressure scenario only. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Thomas > >>>>>>>>>> > >>>>>>>>>> [1] > https://flink.apache.org/2019/06/05/flink-network-stack.html > >>>>>>>>>> [2] > >>>>>>>>>> > >>>>>> > >> > https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>> > >>> > >> > >> > >