[
https://issues.apache.org/jira/browse/FLINK-23815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz updated FLINK-23815:
-------------------------------------
Description:
As discussed in
https://github.com/apache/flink/pull/16655#issuecomment-899603149:
{quote}
When dealing with a PendingCheckpoint, should one be forced to also deal with a
CheckpointPlan at all? I think not, the PendingCheckpoint is purely a tracker
of gathered states and how many tasks have acknowledged and how many we are
still waiting for. That makes testing, reusability, etc. simple.
That means we should strive to have the PendingCheckpoint independent of
CheckpointPlan. I think we can achieve this with the following steps:
Much of the reason to have the CheckpointPlan in PendingCheckpoint is
because the CheckpointCoordinator often requires access to the checkpoint plan
for a pending checkpoint. And it just uses the PendingCheckpoint as a
convenient way to store the CheckpointPlan.
A cleaner solution would be changing in CheckpointCoordinator the Map<Long,
PendingCheckpoint> pendingCheckpoints to a Map<Long, Tuple2<CheckpointPlan,
PendingCheckpoint>> pendingCheckpoints. The CheckpointCoordinator would keep
the two things it frequently needs together in a tuple, rather than storing one
in the other and thus expanding the scope/concerns of PendingCheckpoint.
The above change should allow us to reduce the interface of
PendingCheckpoint.checkpointPlan to PendingCheckpointFinishedTaskStateProvider.
The interface PendingCheckpointFinishedTaskStateProvider has some methods
that are about tracking state, not just about giving access to finished state:
void reportTaskFinishedOnRestore(ExecutionVertex task) and void
reportTaskHasFinishedOperators(ExecutionVertex task);.
I am wondering if those two would not be more cleanly handled outside the
PendingCheckpoint as well. Meaning in the method
CheckpointCoordinator.receiveAcknowledgeMessage() we handle the reporting of
finished operators to the CheckpointPlan. The method
PendingCheckpoint.acknowledgeTask() only deals with the states.
{quote}
was:
As discussed in
https://github.com/apache/flink/pull/16655#issuecomment-899603149:
When dealing with a PendingCheckpoint, should one be forced to also deal with a
CheckpointPlan at all? I think not, the PendingCheckpoint is purely a tracker
of gathered states and how many tasks have acknowledged and how many we are
still waiting for. That makes testing, reusability, etc. simple.
That means we should strive to have the PendingCheckpoint independent of
CheckpointPlan. I think we can achieve this with the following steps:
Much of the reason to have the CheckpointPlan in PendingCheckpoint is
because the CheckpointCoordinator often requires access to the checkpoint plan
for a pending checkpoint. And it just uses the PendingCheckpoint as a
convenient way to store the CheckpointPlan.
A cleaner solution would be changing in CheckpointCoordinator the Map<Long,
PendingCheckpoint> pendingCheckpoints to a Map<Long, Tuple2<CheckpointPlan,
PendingCheckpoint>> pendingCheckpoints. The CheckpointCoordinator would keep
the two things it frequently needs together in a tuple, rather than storing one
in the other and thus expanding the scope/concerns of PendingCheckpoint.
The above change should allow us to reduce the interface of
PendingCheckpoint.checkpointPlan to PendingCheckpointFinishedTaskStateProvider.
The interface PendingCheckpointFinishedTaskStateProvider has some methods
that are about tracking state, not just about giving access to finished state:
void reportTaskFinishedOnRestore(ExecutionVertex task) and void
reportTaskHasFinishedOperators(ExecutionVertex task);.
I am wondering if those two would not be more cleanly handled outside the
PendingCheckpoint as well. Meaning in the method
CheckpointCoordinator.receiveAcknowledgeMessage() we handle the reporting of
finished operators to the CheckpointPlan. The method
PendingCheckpoint.acknowledgeTask() only deals with the states.
> Separate the concerns of PendingCheckpoint and CheckpointPlan
> -------------------------------------------------------------
>
> Key: FLINK-23815
> URL: https://issues.apache.org/jira/browse/FLINK-23815
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Reporter: Yun Gao
> Priority: Major
> Fix For: 1.15.0
>
>
> As discussed in
> https://github.com/apache/flink/pull/16655#issuecomment-899603149:
> {quote}
> When dealing with a PendingCheckpoint, should one be forced to also deal with
> a CheckpointPlan at all? I think not, the PendingCheckpoint is purely a
> tracker of gathered states and how many tasks have acknowledged and how many
> we are still waiting for. That makes testing, reusability, etc. simple.
> That means we should strive to have the PendingCheckpoint independent of
> CheckpointPlan. I think we can achieve this with the following steps:
> Much of the reason to have the CheckpointPlan in PendingCheckpoint is
> because the CheckpointCoordinator often requires access to the checkpoint
> plan for a pending checkpoint. And it just uses the PendingCheckpoint as a
> convenient way to store the CheckpointPlan.
> A cleaner solution would be changing in CheckpointCoordinator the
> Map<Long, PendingCheckpoint> pendingCheckpoints to a Map<Long,
> Tuple2<CheckpointPlan, PendingCheckpoint>> pendingCheckpoints. The
> CheckpointCoordinator would keep the two things it frequently needs together
> in a tuple, rather than storing one in the other and thus expanding the
> scope/concerns of PendingCheckpoint.
> The above change should allow us to reduce the interface of
> PendingCheckpoint.checkpointPlan to
> PendingCheckpointFinishedTaskStateProvider.
> The interface PendingCheckpointFinishedTaskStateProvider has some methods
> that are about tracking state, not just about giving access to finished
> state: void reportTaskFinishedOnRestore(ExecutionVertex task) and void
> reportTaskHasFinishedOperators(ExecutionVertex task);.
> I am wondering if those two would not be more cleanly handled outside the
> PendingCheckpoint as well. Meaning in the method
> CheckpointCoordinator.receiveAcknowledgeMessage() we handle the reporting of
> finished operators to the CheckpointPlan. The method
> PendingCheckpoint.acknowledgeTask() only deals with the states.
> {quote}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)