[
https://issues.apache.org/jira/browse/FLINK-5777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ufuk Celebi closed FLINK-5777.
------------------------------
Resolution: Fixed
Fix Version/s: 1.3.0
Fixed in 6e7a91741708a2b167a2bbca5dda5b2059df5e18.
> Pass savepoint information to CheckpointingOperation
> ----------------------------------------------------
>
> Key: FLINK-5777
> URL: https://issues.apache.org/jira/browse/FLINK-5777
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 1.3.0
>
>
> In order to make savepoints self contained in a single directory, we need to
> pass some information to {{StreamTask#CheckpointingOperation}}.
> I propose to extend the {{CheckpointMetaData}} for this.
> We currently have some overlap with CheckpointMetaData, CheckpointMetrics,
> and manually passed checkpoint ID and checkpoint timestamps. We should
> restrict CheckpointMetaData to the integral meta data that needs to be passed
> to StreamTask#CheckpointingOperation.
> This means that we move the CheckpointMetrics out of the CheckpointMetaData
> and the BarrierBuffer/BarrierTracker create CheckpointMetrics separately and
> send it back with the acknowledge message.
> CheckpointMetaData should be extended with the following properties:
> - boolean isSavepoint
> - String targetDirectory
> There are two code paths that lead to the CheckpointingOperation:
> 1. From CheckpointCoordinator via RPC to StreamTask#triggerCheckpoint
> - Execution#triggerCheckpoint(long, long)
> => triggerCheckpoint(CheckpointMetaData)
> - TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, long, long)
> => TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID,
> CheckpointMetaData)
> - Task#triggerCheckpointBarrier(long, long) =>
> Task#triggerCheckpointBarrier(CheckpointMetaData)
> 2. From intermediate streams via the CheckpointBarrier to
> StreamTask#triggerCheckpointOnBarrier
> - triggerCheckpointOnBarrier(CheckpointMetaData)
> => triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)
> - CheckpointBarrier(long, long) => CheckpointBarrier(CheckpointMetaData)
> - AcknowledgeCheckpoint(CheckpointMetaData)
> => AcknowledgeCheckpoint(long, CheckpointMetrics)
> The state backends provide another stream factory that is called in
> CheckpointingOperation when the meta data indicates savepoint. The state
> backends can choose whether they return the regular checkpoint stream factory
> in that case or a special one for savepoints. That way backends that don’t
> checkpoint to a file system can special case savepoints easily.
> - FsStateBackend: return special FsCheckpointStreamFactory with different
> directory layout
> - MemoryStateBackend: return regular checkpoint stream factory
> (MemCheckpointStreamFactory) => The _metadata file will contain all state as
> the state handles are part of it
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)