[ 
https://issues.apache.org/jira/browse/FLINK-23815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-23815:
-------------------------------------
    Description: 
As discussed in 
https://github.com/apache/flink/pull/16655#issuecomment-899603149:

When dealing with a PendingCheckpoint, should one be forced to also deal with a 
CheckpointPlan at all? I think not, the PendingCheckpoint is purely a tracker 
of gathered states and how many tasks have acknowledged and how many we are 
still waiting for. That makes testing, reusability, etc. simple.

That means we should strive to have the PendingCheckpoint independent of 
CheckpointPlan. I think we can achieve this with the following steps:

    Much of the reason to have the CheckpointPlan in PendingCheckpoint is 
because the CheckpointCoordinator often requires access to the checkpoint plan 
for a pending checkpoint. And it just uses the PendingCheckpoint as a 
convenient way to store the CheckpointPlan.

    A cleaner solution would be changing in CheckpointCoordinator the Map<Long, 
PendingCheckpoint> pendingCheckpoints to a Map<Long, Tuple2<CheckpointPlan, 
PendingCheckpoint>> pendingCheckpoints. The CheckpointCoordinator would keep 
the two things it frequently needs together in a tuple, rather than storing one 
in the other and thus expanding the scope/concerns of PendingCheckpoint.

    The above change should allow us to reduce the interface of 
PendingCheckpoint.checkpointPlan to PendingCheckpointFinishedTaskStateProvider.

    The interface PendingCheckpointFinishedTaskStateProvider has some methods 
that are about tracking state, not just about giving access to finished state: 
void reportTaskFinishedOnRestore(ExecutionVertex task) and void 
reportTaskHasFinishedOperators(ExecutionVertex task);.

    I am wondering if those two would not be more cleanly handled outside the 
PendingCheckpoint as well. Meaning in the method 
CheckpointCoordinator.receiveAcknowledgeMessage() we handle the reporting of 
finished operators to the CheckpointPlan. The method 
PendingCheckpoint.acknowledgeTask() only deals with the states.


  was:As discussed in 
https://github.com/apache/flink/pull/16655#issuecomment-899603149, we could 
further refactor the current implementation and tests to be more clear. 


> Separate the concerns of PendingCheckpoint and CheckpointPlan
> -------------------------------------------------------------
>
>                 Key: FLINK-23815
>                 URL: https://issues.apache.org/jira/browse/FLINK-23815
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>            Reporter: Yun Gao
>            Priority: Major
>             Fix For: 1.15.0
>
>
> As discussed in 
> https://github.com/apache/flink/pull/16655#issuecomment-899603149:
> When dealing with a PendingCheckpoint, should one be forced to also deal with 
> a CheckpointPlan at all? I think not, the PendingCheckpoint is purely a 
> tracker of gathered states and how many tasks have acknowledged and how many 
> we are still waiting for. That makes testing, reusability, etc. simple.
> That means we should strive to have the PendingCheckpoint independent of 
> CheckpointPlan. I think we can achieve this with the following steps:
>     Much of the reason to have the CheckpointPlan in PendingCheckpoint is 
> because the CheckpointCoordinator often requires access to the checkpoint 
> plan for a pending checkpoint. And it just uses the PendingCheckpoint as a 
> convenient way to store the CheckpointPlan.
>     A cleaner solution would be changing in CheckpointCoordinator the 
> Map<Long, PendingCheckpoint> pendingCheckpoints to a Map<Long, 
> Tuple2<CheckpointPlan, PendingCheckpoint>> pendingCheckpoints. The 
> CheckpointCoordinator would keep the two things it frequently needs together 
> in a tuple, rather than storing one in the other and thus expanding the 
> scope/concerns of PendingCheckpoint.
>     The above change should allow us to reduce the interface of 
> PendingCheckpoint.checkpointPlan to 
> PendingCheckpointFinishedTaskStateProvider.
>     The interface PendingCheckpointFinishedTaskStateProvider has some methods 
> that are about tracking state, not just about giving access to finished 
> state: void reportTaskFinishedOnRestore(ExecutionVertex task) and void 
> reportTaskHasFinishedOperators(ExecutionVertex task);.
>     I am wondering if those two would not be more cleanly handled outside the 
> PendingCheckpoint as well. Meaning in the method 
> CheckpointCoordinator.receiveAcknowledgeMessage() we handle the reporting of 
> finished operators to the CheckpointPlan. The method 
> PendingCheckpoint.acknowledgeTask() only deals with the states.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to