[
https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arvid Heise resolved FLINK-20097.
---------------------------------
Resolution: Fixed
> Race conditions in InputChannel.ChannelStatePersister
> -----------------------------------------------------
>
> Key: FLINK-20097
> URL: https://issues.apache.org/jira/browse/FLINK-20097
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier()
> always update pendingCheckpointBarrierId, potentially overwriting newer id
> (or BARRIER_RECEIVED value) with an old one.
> For stopPersisting(), consider a case:
> # Two consecutive UC barriers arrive at the same channel (1st being stale at
> some point)
> # In RemoteInputChannel.onBuffer, netty thread updates
> pendingCheckpointBarrierId to BARRIER_RECEIVED
> # Task thread processes the 1st barrier and triggers a checkpoint
> Task thread processes the 2nd barrier and aborts 1st checkpoint, calling
> stopPersisting() from UC controller and setting pendingCheckpointBarrierId to
> CHECKPOINT_COMPLETED
> # Task thread starts 2nd checkpoint and calls startPersisting() setting
> pendingCheckpointBarrierId to 2
> # now new buffers have a chance to be included in the 2nd checkpoint (though
> they belong to the next one)
>
> For pendingCheckpointBarrierId(), consider an input gate with two channels A
> and B and two barriers 1 and 2:
> # Channel A receives both barriers, channel B receives nothing yet
> # Task thread processes both barriers on A, eventually triggering 2nd
> checkpoint
> # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
> # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
> # No buffers in B between barriers 1 and 2 will be included in the
> checkpoint
> # Channel B receives the 2nd barrier which will eventually conclude the
> checkpoint
>
> I see a solution in doing an action only if passed checkpointId >=
> pendingCheckpointId. For that, a separate field will be needed to hold the
> status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it
> shouldn't be a problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)