[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348492#comment-16348492
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165340082
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -46,26 +52,63 @@
/** */
private final int subtaskIndex;
+ /** */
+ private final Map<Long, TaskStateSnapshot>
storedTaskStateByCheckpointID;
+
+ /** This is the base directory for all local state of the subtask that
owns this {@link TaskLocalStateStore}. */
+ private final File subtaskLocalStateBaseDirectory;
+
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
- int subtaskIndex) {
+ int subtaskIndex,
+ File localStateRootDir) {
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
+ this.storedTaskStateByCheckpointID = new HashMap<>();
+ this.subtaskLocalStateBaseDirectory =
+ new File(localStateRootDir, createSubtaskPath(jobID,
jobVertexID, subtaskIndex));
+ }
+
+ static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID,
int subtaskIndex) {
+ return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" +
subtaskIndex;
}
public void storeLocalState(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nullable TaskStateSnapshot localState) {
- if (localState != null) {
- throw new UnsupportedOperationException("Implement this
before actually providing local state!");
+ TaskStateSnapshot previous =
+
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(),
localState);
+
+ if (previous != null) {
+ throw new IllegalStateException("Found previously
registered local state for checkpoint with id " +
+ checkpointMetaData.getCheckpointId() + "! This
indicated a problem.");
}
}
- public void dispose() {
- //TODO
+ public void dispose() throws Exception {
+
+ Exception collectedException = null;
+
+ for (TaskStateSnapshot snapshot :
storedTaskStateByCheckpointID.values()) {
+ try {
+ snapshot.discardState();
+ } catch (Exception discardEx) {
+ collectedException =
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
+ }
+ }
+
+ if (collectedException != null) {
+ throw collectedException;
+ }
+
+
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
--- End diff --
Is there a way to retry the non discarded state handles based on this
directory? If not, then we could delete it also in case of a failure.
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)