[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363952#comment-16363952 ]
ASF GitHub Bot commented on FLINK-8360: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168168381 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java --- @@ -46,26 +52,63 @@ /** */ private final int subtaskIndex; + /** */ + private final Map<Long, TaskStateSnapshot> storedTaskStateByCheckpointID; + + /** This is the base directory for all local state of the subtask that owns this {@link TaskLocalStateStore}. */ + private final File subtaskLocalStateBaseDirectory; + public TaskLocalStateStore( JobID jobID, JobVertexID jobVertexID, - int subtaskIndex) { + int subtaskIndex, + File localStateRootDir) { this.jobID = jobID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; + this.storedTaskStateByCheckpointID = new HashMap<>(); + this.subtaskLocalStateBaseDirectory = + new File(localStateRootDir, createSubtaskPath(jobID, jobVertexID, subtaskIndex)); + } + + static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, int subtaskIndex) { + return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + subtaskIndex; } public void storeLocalState( @Nonnull CheckpointMetaData checkpointMetaData, @Nullable TaskStateSnapshot localState) { - if (localState != null) { - throw new UnsupportedOperationException("Implement this before actually providing local state!"); + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState); + + if (previous != null) { + throw new IllegalStateException("Found previously registered local state for checkpoint with id " + + checkpointMetaData.getCheckpointId() + "! This indicated a problem."); } } - public void dispose() { - //TODO + public void dispose() throws Exception { + + Exception collectedException = null; + + for (TaskStateSnapshot snapshot : storedTaskStateByCheckpointID.values()) { + try { + snapshot.discardState(); + } catch (Exception discardEx) { + collectedException = ExceptionUtils.firstOrSuppressed(discardEx, collectedException); + } + } + + if (collectedException != null) { + throw collectedException; + } + + FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory); --- End diff -- This already works different in later commits. > Implement task-local state recovery > ----------------------------------- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)