[
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361295#comment-16361295
]
ASF GitHub Bot commented on FLINK-8533:
---------------------------------------
Github user EronWright commented on the issue:
https://github.com/apache/flink/pull/5427
@StephanEwen thanks for taking a look, I agree with trying to avoid a new
lifecycle method.
The `initializeState` method on the hook interface gives the hook an
unconditional initialization point.
In the Pravega case, we would move reader-group (RG) initialization from
client to server, and always reset the RG to its initial conditions. A
subsequent restore may or may not occur.
Assuming we like this approach, let's discuss how to make it work purely
with `restoreLatestCheckpointedState`. The `restoreLatestCheckpointedState`
method is not called by the ExecutionGraph (EG) upon initial execution, which
we would want to support the new `initializeState` method. Would there be any
issue with calling `restoreLatestCheckpointedState` on initial execution? Such
symmetry would seem desirable.
**Existing approach**:
```
=== initial ===
\-- JM.submitJob
| \-- EG.scheduleForExecution
=== restart===
\-- RestartCallback.triggerFullRecovery
| \-- EG.restart
| | \-- CC.restoreLatestCheckpointedState
| | \-- EG.scheduleForExecution
```
**Suggested approach**:
```
=== initial ===
\-- JM.submitJob
| \-- EG.start (** new method **)
| | \-- CC.restoreLatestCheckpointedState
| | | \-- Hook.initializeState
| | \-- EG.scheduleForExecution
=== restart===
\-- RestartCallback.triggerFullRecovery
| \-- EG.restart
| | \-- CC.restoreLatestCheckpointedState
| | | \-- Hook.initializeState
| | \-- EG.scheduleForExecution
```
> Support MasterTriggerRestoreHook state reinitialization
> -------------------------------------------------------
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.0
> Reporter: Eron Wright
> Assignee: Eron Wright
> Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for
> taking or restoring checkpoints. When execution is restarted from a
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the
> external system state. There's an edge case where the external state is not
> adequately reinitialized, that is when execution fails _before the first
> checkpoint_. In that case, the hook is not invoked and has no opportunity to
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in
> the Pravega source function, the reader group state (e.g. stream position
> data) is stored externally. In the normal restore case, the reader group
> state is forcibly rewound to the checkpointed position. In the edge case
> where no checkpoint has yet been successful, the reader group state is not
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this
> method would be invoked unconditionally upon hook initialization. The Pravega
> hook would, for example, initialize or forcibly reinitialize the reader group
> state.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)