Interesting problem! Thanks for bringing it up Thomas.

Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots [1] 
would help out solve this problem more elegantly without sacrificing 
correctness.
- They do not need alignment, only (async) logging for in-flight records 
between the time the first barrier is processed until the last barrier arrives 
in a task.
- They work fine for failure recovery as long as logged records are replayed on 
startup.

Flink’s “alligned” savepoints would probably be still necessary for 
transactional sink commits + any sort of reconfiguration (e.g., rescaling, 
updating the logic of operators to evolve an application etc.).

I don’t completely understand the “overtaking” approach but if you have a 
concrete definition I would be happy to check it out and help if I can!
Mind that Chandy-Lamport essentially does this by logging things in pending 
channels in a task snapshot before the barrier arrives.

-Paris

[1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm 
<https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>

> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> wrote:
> 
> Hi Thomas,
> 
> As Zhijiang has responded, we are now in the process of discussing how to 
> address this issue and one of the solution that we are discussing is exactly 
> what you are proposing: checkpoint barriers overtaking the in flight data and 
> make the in flight data part of the checkpoint.
> 
> If everything works well, we will be able to present result of our 
> discussions on the dev mailing list soon. 
> 
> Piotrek
> 
>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com.INVALID> 
>> wrote:
>> 
>> Hi Thomas,
>> 
>> Thanks for proposing this concern. The barrier alignment takes long time in 
>> backpressure case which could cause several problems:
>> 1. Checkpoint timeout as you mentioned.
>> 2. The recovery cost is high once failover, because much data needs to be 
>> replayed.
>> 3. The delay for commit-based sink is high in exactly-once.
>> 
>> For credit-based flow control from release-1.5, the amount of in-flighting 
>> buffers before barrier alignment is reduced, so we could get a bit
>> benefits from speeding checkpoint aspect.
>> 
>> In release-1.8, I guess we did not suspend the channels which already 
>> received the barrier in practice. But actually we ever did the similar thing
>> to speed barrier alighment before. I am not quite sure that release-1.8 
>> covers this feature. There were some relevant discussions under jira [1].
>> 
>> For release-1.10, the community is now discussing the feature of unaligned 
>> checkpoint which is mainly for resolving above concerns. The basic idea
>> is to make barrier overtakes the output/input buffer queue to speed 
>> alignment, and snapshot the input/output buffers as part of checkpoint 
>> state. The
>> details have not confirmed yet and is still under discussion. Wish we could 
>> make some improvments for the release-1.10.
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-8523
>> 
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Thomas Weise <t...@apache.org>
>> Send Time:2019年8月12日(星期一) 21:38
>> To:dev <dev@flink.apache.org>
>> Subject:Checkpointing under backpressure
>> 
>> Hi,
>> 
>> One of the major operational difficulties we observe with Flink are
>> checkpoint timeouts under backpressure. I'm looking for both confirmation
>> of my understanding of the current behavior as well as pointers for future
>> improvement work:
>> 
>> Prior to introduction of credit based flow control in the network stack [1]
>> [2], checkpoint barriers would back up with the data for all logical
>> channels due to TCP backpressure. Since Flink 1.5, the buffers are
>> controlled per channel, and checkpoint barriers are only held back for
>> channels that have backpressure, while others can continue processing
>> normally. However, checkpoint barriers still cannot "overtake data",
>> therefore checkpoint alignment remains affected for the channel with
>> backpressure, with the potential for slow checkpointing and timeouts.
>> Albeit the delay of barriers would be capped by the maximum in-transit
>> buffers per channel, resulting in an improvement compared to previous
>> versions of Flink. Also, the backpressure based checkpoint alignment can
>> help the barrier advance faster on the receiver side (by suspending
>> channels that have already delivered the barrier). Is that accurate as of
>> Flink 1.8?
>> 
>> What appears to be missing to completely unblock checkpointing is a
>> mechanism for checkpoints to overtake the data. That would help in
>> situations where the processing itself is the bottleneck and prioritization
>> in the network stack alone cannot address the barrier delay. Was there any
>> related discussion? One possible solution would be to drain incoming data
>> till the barrier and make it part of the checkpoint instead of processing
>> it. This is somewhat related to asynchronous processing, but I'm thinking
>> more of a solution that is automated in the Flink runtime for the
>> backpressure scenario only.
>> 
>> Thanks,
>> Thomas
>> 
>> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html
>> [2]
>> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
>> 
> 

Reply via email to