yes! It’s quite similar I think.  Though mind that the devil is in the details, 
i.e., the temporal order actions are taken.

To clarify, let us say you have a task T with two input channels I1 and I2.
The Chandy Lamport execution flow is the following:

1) T receives barrier from  I1 and...
2)  ...the following three actions happen atomically
        I )  T snapshots its state T*
        II)  T forwards marker to its outputs
        III) T starts logging all events of I2 (only) into a buffer M*
- Also notice here that T does NOT block I1 as it does in aligned snapshots -
3) Eventually T receives barrier from I2 and stops recording events. Its 
asynchronously captured snapshot is now complete: {T*,M*}. 
Upon recovery all messages of M* should be replayed in FIFO order.

With this approach alignment does not create a deadlock situation since anyway 
2.II happens asynchronously and messages can be logged as well asynchronously 
during the process of the snapshot. If there is back-pressure in a pipeline the 
cause is most probably not this algorithm.

Back to your observation, the answer : yes and no.  In your network model, I 
can see the logic of “logging” and “committing” a final snapshot being provided 
by the channel implementation. However, do mind that the first barrier always 
needs to go “all the way” to initiate the Chandy Lamport algorithm logic.

The above flow has been proven using temporal logic in my phd thesis in case 
you are interested about the proof.
I hope this helps a little clarifying things. Let me know if there is any 
confusing point to disambiguate. I would be more than happy to help if I can.

Paris

> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com> wrote:
> 
> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t you still 
> have to wait for the “checkpoint barrier” to arrive in order to know when 
> have you already received all possible messages from the upstream 
> tasks/operators? So instead of processing the “in flight” messages (as the 
> Flink is doing currently), you are sending them to an “observer”?
> 
> In that case, that’s sounds similar to “checkpoint barriers overtaking in 
> flight records” (aka unaligned checkpoints). Just for us, the observer is a 
> snapshot state.
> 
> Piotrek
> 
>> On 13 Aug 2019, at 13:14, Paris Carbone <seniorcarb...@gmail.com> wrote:
>> 
>> Interesting problem! Thanks for bringing it up Thomas.
>> 
>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots [1] 
>> would help out solve this problem more elegantly without sacrificing 
>> correctness.
>> - They do not need alignment, only (async) logging for in-flight records 
>> between the time the first barrier is processed until the last barrier 
>> arrives in a task.
>> - They work fine for failure recovery as long as logged records are replayed 
>> on startup.
>> 
>> Flink’s “alligned” savepoints would probably be still necessary for 
>> transactional sink commits + any sort of reconfiguration (e.g., rescaling, 
>> updating the logic of operators to evolve an application etc.).
>> 
>> I don’t completely understand the “overtaking” approach but if you have a 
>> concrete definition I would be happy to check it out and help if I can!
>> Mind that Chandy-Lamport essentially does this by logging things in pending 
>> channels in a task snapshot before the barrier arrives.
>> 
>> -Paris
>> 
>> [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm 
>> <https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
>> 
>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> wrote:
>>> 
>>> Hi Thomas,
>>> 
>>> As Zhijiang has responded, we are now in the process of discussing how to 
>>> address this issue and one of the solution that we are discussing is 
>>> exactly what you are proposing: checkpoint barriers overtaking the in 
>>> flight data and make the in flight data part of the checkpoint.
>>> 
>>> If everything works well, we will be able to present result of our 
>>> discussions on the dev mailing list soon. 
>>> 
>>> Piotrek
>>> 
>>>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com.INVALID> 
>>>> wrote:
>>>> 
>>>> Hi Thomas,
>>>> 
>>>> Thanks for proposing this concern. The barrier alignment takes long time 
>>>> in backpressure case which could cause several problems:
>>>> 1. Checkpoint timeout as you mentioned.
>>>> 2. The recovery cost is high once failover, because much data needs to be 
>>>> replayed.
>>>> 3. The delay for commit-based sink is high in exactly-once.
>>>> 
>>>> For credit-based flow control from release-1.5, the amount of in-flighting 
>>>> buffers before barrier alignment is reduced, so we could get a bit
>>>> benefits from speeding checkpoint aspect.
>>>> 
>>>> In release-1.8, I guess we did not suspend the channels which already 
>>>> received the barrier in practice. But actually we ever did the similar 
>>>> thing
>>>> to speed barrier alighment before. I am not quite sure that release-1.8 
>>>> covers this feature. There were some relevant discussions under jira [1].
>>>> 
>>>> For release-1.10, the community is now discussing the feature of unaligned 
>>>> checkpoint which is mainly for resolving above concerns. The basic idea
>>>> is to make barrier overtakes the output/input buffer queue to speed 
>>>> alignment, and snapshot the input/output buffers as part of checkpoint 
>>>> state. The
>>>> details have not confirmed yet and is still under discussion. Wish we 
>>>> could make some improvments for the release-1.10.
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
>>>> 
>>>> Best,
>>>> Zhijiang
>>>> ------------------------------------------------------------------
>>>> From:Thomas Weise <t...@apache.org>
>>>> Send Time:2019年8月12日(星期一) 21:38
>>>> To:dev <dev@flink.apache.org>
>>>> Subject:Checkpointing under backpressure
>>>> 
>>>> Hi,
>>>> 
>>>> One of the major operational difficulties we observe with Flink are
>>>> checkpoint timeouts under backpressure. I'm looking for both confirmation
>>>> of my understanding of the current behavior as well as pointers for future
>>>> improvement work:
>>>> 
>>>> Prior to introduction of credit based flow control in the network stack [1]
>>>> [2], checkpoint barriers would back up with the data for all logical
>>>> channels due to TCP backpressure. Since Flink 1.5, the buffers are
>>>> controlled per channel, and checkpoint barriers are only held back for
>>>> channels that have backpressure, while others can continue processing
>>>> normally. However, checkpoint barriers still cannot "overtake data",
>>>> therefore checkpoint alignment remains affected for the channel with
>>>> backpressure, with the potential for slow checkpointing and timeouts.
>>>> Albeit the delay of barriers would be capped by the maximum in-transit
>>>> buffers per channel, resulting in an improvement compared to previous
>>>> versions of Flink. Also, the backpressure based checkpoint alignment can
>>>> help the barrier advance faster on the receiver side (by suspending
>>>> channels that have already delivered the barrier). Is that accurate as of
>>>> Flink 1.8?
>>>> 
>>>> What appears to be missing to completely unblock checkpointing is a
>>>> mechanism for checkpoints to overtake the data. That would help in
>>>> situations where the processing itself is the bottleneck and prioritization
>>>> in the network stack alone cannot address the barrier delay. Was there any
>>>> related discussion? One possible solution would be to drain incoming data
>>>> till the barrier and make it part of the checkpoint instead of processing
>>>> it. This is somewhat related to asynchronous processing, but I'm thinking
>>>> more of a solution that is automated in the Flink runtime for the
>>>> backpressure scenario only.
>>>> 
>>>> Thanks,
>>>> Thomas
>>>> 
>>>> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html
>>>> [2]
>>>> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
>>>> 
>>> 
>> 
> 

Reply via email to