[
https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230235#comment-17230235
]
Arvid Heise commented on FLINK-20097:
-------------------------------------
You are right that the code currently is not behaving well when you have two
concurrent UC. I actually wanted to make it possible to process concurrent
checkpoint in all 1.12 code, but it didn't make it through the review. I'm
assuming that as long as we don't know if we want to support concurrent UC, the
code was not needed.
I'm unsure why stuff is failing with one checkpoint. It could be related to
cancellation, such that actually two UC barriers arrive at some task instead.
> Race conditions in InputChannel.ChannelStatePersister
> -----------------------------------------------------
>
> Key: FLINK-20097
> URL: https://issues.apache.org/jira/browse/FLINK-20097
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier()
> always update pendingCheckpointBarrierId, potentially overwriting newer id
> (or BARRIER_RECEIVED value) with an old one.
> For stopPersisting(), consider a case:
> # Two consecutive UC barriers arrive at the same channel (1st being stale at
> some point)
> # In RemoteInputChannel.onBuffer, netty thread updates
> pendingCheckpointBarrierId to BARRIER_RECEIVED
> # Task thread processes the 1st barrier and triggers a checkpoint
> Task thread processes the 2nd barrier and aborts 1st checkpoint, calling
> stopPersisting() from UC controller and setting pendingCheckpointBarrierId to
> CHECKPOINT_COMPLETED
> # Task thread starts 2nd checkpoint and calls startPersisting() setting
> pendingCheckpointBarrierId to 2
> # now new buffers have a chance to be included in the 2nd checkpoint (though
> they belong to the next one)
>
> For pendingCheckpointBarrierId(), consider an input gate with two channels A
> and B and two barriers 1 and 2:
> # Channel A receives both barriers, channel B receives nothing yet
> # Task thread processes both barriers on A, eventually triggering 2nd
> checkpoint
> # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
> # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
> # No buffers in B between barriers 1 and 2 will be included in the
> checkpoint
> # Channel B receives the 2nd barrier which will eventually conclude the
> checkpoint
>
> I see a solution in doing an action only if passed checkpointId >=
> pendingCheckpointId. For that, a separate field will be needed to hold the
> status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it
> shouldn't be a problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)