[
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842009#comment-16842009
]
vinoyang edited comment on FLINK-10855 at 5/17/19 8:50 AM:
-----------------------------------------------------------
[~richtesn] I invite you is because it is related to the component we are
working recently. If you do not mind, I can give a more detailed description:
Considering there are two tasks which are snapshotting belong to one job for
one checkpoint.
Task1's behavior in JM end:
{code:java}
receiveDeclineMessage -> discardCheckpoint -> pendingCheckpoint.abort ->
dispose(in fianlly block) -> CheckpointStorageLocation#disposeOnFailure(delete
cp dir){code}
Task2's behavior in TM end:
write snapshot data to statebackend and create checkpoint location again and
send ack message to JM end.
Task2's behavior in JM end:
{code:java}
receiveAcknowledgeMessage -> discardSubtaskState (else branch)
{code}
The PendingCheckpoint#abort may not cause the job to fail and recover in
{{CheckpointFailureManager}}. And {{CheckpointCoordinator}} would not send a
cancel pending checkpoint message to other tasks who are doing a snapshot.
So I suggest we could introduce a cleanup mechanism.
was (Author: yanghua):
[~richtesn] I invite you is because it is related to the component we are
working. If you do not mind, I can give a more detailed description:
Consider there are two tasks which are snapshotting in one job for one
checkpoint.
Task1's behavior in JM end:
{code:java}
receiveDeclineMessage -> discardCheckpoint -> pendingCheckpoint.abort ->
dispose(in fianlly block) -> CheckpointStorageLocation#disposeOnFailure(delete
cp dir){code}
Task2's behavior in TM end:
write snapshot data to statebackend and create checkpoint location again and
send ack message to JM end.
Task2's behavior in JM end:
{code:java}
receiveAcknowledgeMessage -> discardSubtaskState (else branch)
{code}
The PendingCheckpoint#abort may not cause the job to fail and recover in
{{CheckpointFailureManager}}. And {{CheckpointCoordinator}} would not send a
cancel pending checkpoint message to other tasks who are doing a snapshot.
So I suggest we could introduce a cleanup mechanism.
> CheckpointCoordinator does not delete checkpoint directory of late/failed
> checkpoints
> -------------------------------------------------------------------------------------
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Reporter: Till Rohrmann
> Assignee: vinoyang
> Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot
> be acknowledged, we discard the subtask state in the
> {{CheckpointCoordinator}}. What's not happening in this case is that we
> delete the parent directory of the checkpoint. This only happens when we
> dispose a {{PendingCheckpoint#dispose}}.
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not
> being ready) and we delete the checkpoint directory. Next another task writes
> its checkpoint data to the checkpoint directory (thereby creating it again)
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The
> {{CheckpointCoordinator}} will realize that there is no longer a
> {{PendingCheckpoint}} and will discard the sub task state. This will remove
> the state files from the checkpoint directory but will leave the checkpoint
> directory untouched.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)