ASF GitHub Bot commented on FLINK-8533:

Github user tillrohrmann commented on the issue:

    @EronWright, you're right that on initial submission we don't call 
`restoreLatestCheckpointedState` in the old code. With Flip-6 this will be the 
case. See #5444.
    The underlying assumption to make this work, though, is that a user won't 
submit a new job with the a job id to a cluster with a cluster id for which 
ZooKeeper already contains persisted checkpoints from a previous run. So either 
the cluster id or the job id must be different. 
    I think so far, when using the Flink client this should be the case. 
However, when generating the `JobGraph` yourself and keeping it around to 
submit it to a standalone cluster, then this assumption will break because both 
the `JobID` and the cluster id will be the same.

> Support MasterTriggerRestoreHook state reinitialization
> -------------------------------------------------------
>                 Key: FLINK-8533
>                 URL: https://issues.apache.org/jira/browse/FLINK-8533
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>            Priority: Major
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    

This message was sent by Atlassian JIRA

Reply via email to