[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365772#comment-16365772
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168514624
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
    @@ -20,35 +20,47 @@
     
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
    -import org.apache.flink.util.Preconditions;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     
     import java.io.File;
    -import java.util.HashMap;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
     
     /**
      * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
    - *
    - * TODO: this still still work in progress and partially still acts as a 
placeholder.
      */
     public class TaskExecutorLocalStateStoresManager {
     
    +   /** Logger for this class. */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    +
        /**
         * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
    -    * this.
    +    * this. Maps from allocation id to all the subtask's local state 
stores.
         */
    -   private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> 
taskStateStoresByJobID;
    +   private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStore>> taskStateStoresByAllocationID;
     
        /** This is the root directory for all local state of this task manager 
/ executor. */
        private final File[] localStateRootDirectories;
     
    -   public TaskExecutorLocalStateStoresManager(File[] 
localStateRootDirectories) {
    -           this.taskStateStoresByJobID = new HashMap<>();
    -           this.localStateRootDirectories = 
Preconditions.checkNotNull(localStateRootDirectories);
    +   /** Executor that runs the discarding of released state objects. */
    +   private final Executor discardExecutor;
    +
    +   public TaskExecutorLocalStateStoresManager(
    +           @Nonnull File[] localStateRootDirectories,
    +           @Nonnull Executor discardExecutor) {
    --- End diff --
    
    At least it is highlighting potential problems in the IDE and you can 
instruct the compiler to introduce runtime checks if you wish so (if you 
compile with Intellij this is the default).


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to