[ 
https://issues.apache.org/jira/browse/FLINK-5777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5777.
------------------------------
       Resolution: Fixed
    Fix Version/s: 1.3.0

Fixed in 6e7a91741708a2b167a2bbca5dda5b2059df5e18.

> Pass savepoint information to CheckpointingOperation
> ----------------------------------------------------
>
>                 Key: FLINK-5777
>                 URL: https://issues.apache.org/jira/browse/FLINK-5777
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 1.3.0
>
>
> In order to make savepoints self contained in a single directory, we need to 
> pass some information to {{StreamTask#CheckpointingOperation}}.
> I propose to extend the {{CheckpointMetaData}} for this.
> We currently have some overlap with CheckpointMetaData, CheckpointMetrics, 
> and manually passed checkpoint ID and checkpoint timestamps. We should 
> restrict CheckpointMetaData to the integral meta data that needs to be passed 
> to StreamTask#CheckpointingOperation.
> This means that we move the CheckpointMetrics out of the CheckpointMetaData 
> and the BarrierBuffer/BarrierTracker create CheckpointMetrics separately and 
> send it back with the acknowledge message.
> CheckpointMetaData should be extended with the following properties:
> - boolean isSavepoint
> - String targetDirectory
> There are two code paths that lead to the CheckpointingOperation:
> 1. From CheckpointCoordinator via RPC to StreamTask#triggerCheckpoint
> - Execution#triggerCheckpoint(long, long) 
> => triggerCheckpoint(CheckpointMetaData)
> - TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, long, long) 
> => TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, 
> CheckpointMetaData)
> - Task#triggerCheckpointBarrier(long, long) =>  
> Task#triggerCheckpointBarrier(CheckpointMetaData)
> 2. From intermediate streams via the CheckpointBarrier to  
> StreamTask#triggerCheckpointOnBarrier
> - triggerCheckpointOnBarrier(CheckpointMetaData)
> => triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)
> - CheckpointBarrier(long, long) => CheckpointBarrier(CheckpointMetaData)
> - AcknowledgeCheckpoint(CheckpointMetaData)
> => AcknowledgeCheckpoint(long, CheckpointMetrics)
> The state backends provide another stream factory that is called in 
> CheckpointingOperation when the meta data indicates savepoint. The state 
> backends can choose whether they return the regular checkpoint stream factory 
> in that case or a special one for savepoints. That way backends that don’t 
> checkpoint to a file system can special case savepoints easily.
> - FsStateBackend: return special FsCheckpointStreamFactory with different 
> directory layout
> - MemoryStateBackend: return regular checkpoint stream factory 
> (MemCheckpointStreamFactory) => The _metadata file will contain all state as 
> the state handles are part of it



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to