[
https://issues.apache.org/jira/browse/FLINK-19701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yun Gao updated FLINK-19701:
----------------------------
Description:
Current CheckpointUnaligner interacts with RemoteInputChannel to persisting the
input buffers. However, based the current implementation it seems if we have
the following case:
{code:java}
1. There are 3 input channels.
2. Input channel 0 received barrier 1, and processed barrier 1 to start
checkpoint 1.
3. Input channel 1 received barrier 1, and processed barrier 1. Now the state
of input channel persister becomes BARRIER_RECEIVED and
numBuffersOvertaken(channel 1) = n_1.
4. However, input 2 received nothing and the checkpoint expired, new checkpoint
is trigger.
5. Input channel 0 received barrier 2, checkpoint 1 is deserted and checkpoint
2 is started. However, in this case the state of the input channels are not
cleared. Thus now channel 1 is still BARRIER_RECEIVED and
numBuffersOvertaken(channel 1) = n_1. Then channel 1 would only persist n_1
buffers in the channel for the new checkpoint 2.
{code}
was:
Current CheckpointUnaligner interacts with RemoteInputChannel to persisting the
input buffers. However, based the current implementation it seems if we have
the following case:
{code:java}
1. There are 3 input channels.
2. Input channel 0 received barrier 1, and processed barrier 1 to start
checkpoint 1.
3. Input channel 1 received barrier 1, and processed barrier 1. Now the state
of input channel persister becomes BARRIER_RECEIVED and
numBuffersOvertaken(channel 1) = n_1.
4. However, input 2 received nothing and the checkpoint expired, new checkpoint
is trigger.
5. Input channel 0 received barrier 2, checkpoint 1 is deserted and checkpoint
2 is started. However, in this case the state of the input channels are not
cleared. Thus now channel 1 is still BARRIER_RECEIVED and
numBuffersOvertaken(channel 1) = n_1. Then channel 1 would only persist n_1
buffers in the channel.
{code}
> Unaligned Checkpoint might misuse the number of buffers to persist from the
> previous barrier
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-19701
> URL: https://issues.apache.org/jira/browse/FLINK-19701
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.12.0
> Reporter: Yun Gao
> Priority: Major
>
> Current CheckpointUnaligner interacts with RemoteInputChannel to persisting
> the input buffers. However, based the current implementation it seems if we
> have the following case:
> {code:java}
> 1. There are 3 input channels.
> 2. Input channel 0 received barrier 1, and processed barrier 1 to start
> checkpoint 1.
> 3. Input channel 1 received barrier 1, and processed barrier 1. Now the state
> of input channel persister becomes BARRIER_RECEIVED and
> numBuffersOvertaken(channel 1) = n_1.
> 4. However, input 2 received nothing and the checkpoint expired, new
> checkpoint is trigger.
> 5. Input channel 0 received barrier 2, checkpoint 1 is deserted and
> checkpoint 2 is started. However, in this case the state of the input
> channels are not cleared. Thus now channel 1 is still BARRIER_RECEIVED and
> numBuffersOvertaken(channel 1) = n_1. Then channel 1 would only persist n_1
> buffers in the channel for the new checkpoint 2.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)