[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363952#comment-16363952
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168168381
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
    @@ -46,26 +52,63 @@
        /** */
        private final int subtaskIndex;
     
    +   /** */
    +   private final Map<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
    +
    +   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
    +   private final File subtaskLocalStateBaseDirectory;
    +
        public TaskLocalStateStore(
                JobID jobID,
                JobVertexID jobVertexID,
    -           int subtaskIndex) {
    +           int subtaskIndex,
    +           File localStateRootDir) {
     
                this.jobID = jobID;
                this.jobVertexID = jobVertexID;
                this.subtaskIndex = subtaskIndex;
    +           this.storedTaskStateByCheckpointID = new HashMap<>();
    +           this.subtaskLocalStateBaseDirectory =
    +                   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
    +   }
    +
    +   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
    +           return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
        }
     
        public void storeLocalState(
                @Nonnull CheckpointMetaData checkpointMetaData,
                @Nullable TaskStateSnapshot localState) {
     
    -           if (localState != null) {
    -                   throw new UnsupportedOperationException("Implement this 
before actually providing local state!");
    +           TaskStateSnapshot previous =
    +                   
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), 
localState);
    +
    +           if (previous != null) {
    +                   throw new IllegalStateException("Found previously 
registered local state for checkpoint with id " +
    +                           checkpointMetaData.getCheckpointId() + "! This 
indicated a problem.");
                }
        }
     
    -   public void dispose() {
    -           //TODO
    +   public void dispose() throws Exception {
    +
    +           Exception collectedException = null;
    +
    +           for (TaskStateSnapshot snapshot : 
storedTaskStateByCheckpointID.values()) {
    +                   try {
    +                           snapshot.discardState();
    +                   } catch (Exception discardEx) {
    +                           collectedException = 
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
    +                   }
    +           }
    +
    +           if (collectedException != null) {
    +                   throw collectedException;
    +           }
    +
    +           
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
    --- End diff --
    
    This already works different in later commits.


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to