[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367129#comment-16367129
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168752498
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
    @@ -18,81 +18,267 @@
     
     package org.apache.flink.runtime.state;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.FileUtils;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
    +import javax.annotation.concurrent.GuardedBy;
     
    +import java.io.File;
    +import java.io.IOException;
     import java.util.HashMap;
     import java.util.Map;
    +import java.util.concurrent.Executor;
     
     /**
    - * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
    - *
    - * TODO: this still still work in progress and partially still acts as a 
placeholder.
    + * This class holds the all {@link TaskLocalStateStoreImpl} objects for a 
task executor (manager).
      */
     public class TaskExecutorLocalStateStoresManager {
     
    +   /** Logger for this class. */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    +
        /**
         * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
    -    * this.
    +    * this. Maps from allocation id to all the subtask's local state 
stores.
         */
    -   private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> 
taskStateManagers;
    +   @GuardedBy("lock")
    +   private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
    +
    +   /** The configured mode for local recovery on this task manager. */
    +   private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
    +
    +   /** This is the root directory for all local state of this task manager 
/ executor. */
    +   private final File[] localStateRootDirectories;
    +
    +   /** Executor that runs the discarding of released state objects. */
    +   private final Executor discardExecutor;
    +
    +   /** Guarding lock for taskStateStoresByAllocationID and closed-flag. */
    +   private final Object lock;
    +
    +   private final Thread shutdownHook;
     
    -   public TaskExecutorLocalStateStoresManager() {
    -           this.taskStateManagers = new HashMap<>();
    +   @GuardedBy("lock")
    +   private boolean closed;
    +
    +   public TaskExecutorLocalStateStoresManager(
    +           @Nonnull LocalRecoveryConfig.LocalRecoveryMode 
localRecoveryMode,
    +           @Nonnull File[] localStateRootDirectories,
    +           @Nonnull Executor discardExecutor) {
    +
    +           this.taskStateStoresByAllocationID = new HashMap<>();
    +           this.localRecoveryMode = localRecoveryMode;
    +           this.localStateRootDirectories = localStateRootDirectories;
    +           this.discardExecutor = discardExecutor;
    +           this.lock = new Object();
    +           this.closed = false;
    +
    +           for (File localStateRecoveryRootDir : 
localStateRootDirectories) {
    +                   if (!localStateRecoveryRootDir.exists()) {
    +
    +                           if (!localStateRecoveryRootDir.mkdirs()) {
    +                                   throw new IllegalStateException("Could 
not create root directory for local recovery: " +
    +                                           localStateRecoveryRootDir);
    +                           }
    +                   }
    +           }
    +
    +           // install a shutdown hook
    +           this.shutdownHook = new 
Thread("TaskExecutorLocalStateStoresManager shutdown hook") {
    +                   @Override
    +                   public void run() {
    +                           shutdown();
    +                   }
    +           };
    +           try {
    +                   Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    +           } catch (IllegalStateException e) {
    +                   // race, JVM is in shutdown already, we can safely 
ignore this
    +                   LOG.debug("Unable to add shutdown hook for 
TaskExecutorLocalStateStoresManager, shutdown already in progress", e);
    +           } catch (Throwable t) {
    +                   LOG.warn("Error while adding shutdown hook for 
TaskExecutorLocalStateStoresManager", t);
    +           }
        }
     
    -   public TaskLocalStateStore localStateStoreForTask(
    -           JobID jobId,
    -           JobVertexID jobVertexID,
    -           int subtaskIndex) {
    +   @Nonnull
    +   public TaskLocalStateStore localStateStoreForSubtask(
    +           @Nonnull JobID jobId,
    +           @Nonnull AllocationID allocationID,
    +           @Nonnull JobVertexID jobVertexID,
    +           @Nonnegative int subtaskIndex) {
    +
    +           synchronized (lock) {
    +
    +                   if (closed) {
    +                           throw new 
IllegalStateException("TaskExecutorLocalStateStoresManager is already closed 
and cannot " +
    +                                   "register a new TaskLocalStateStore.");
    +                   }
    +
    +                   final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
taskStateManagers =
    +                           
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new 
HashMap<>());
    +
    +                   final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
     
    -           Preconditions.checkNotNull(jobId);
    -           final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
    +                   // create the allocation base dirs, one inside each 
root dir.
    +                   File[] allocationBaseDirectories = 
allocationBaseDirectories(allocationID);
     
    -           final Map<JobVertexSubtaskKey, TaskLocalStateStore> 
taskStateManagers =
    -                   this.taskStateManagers.computeIfAbsent(jobId, k -> new 
HashMap<>());
    +                   LocalRecoveryDirectoryProviderImpl directoryProvider = 
new LocalRecoveryDirectoryProviderImpl(
    +                           allocationBaseDirectories,
    +                           jobId,
    +                           allocationID,
    +                           jobVertexID,
    +                           subtaskIndex);
     
    -           return taskStateManagers.computeIfAbsent(
    -                   taskKey, k -> new TaskLocalStateStore(jobId, 
jobVertexID, subtaskIndex));
    +                   LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(
    +                           localRecoveryMode,
    +                           directoryProvider);
    +
    +                   return taskStateManagers.computeIfAbsent(
    +                           taskKey,
    +                           k -> new TaskLocalStateStoreImpl(
    +                                   jobId,
    +                                   allocationID,
    +                                   jobVertexID,
    +                                   subtaskIndex,
    +                                   localRecoveryConfig,
    +                                   discardExecutor));
    +           }
        }
     
    -   public void releaseJob(JobID jobID) {
    +   public void releaseLocalStateForAllocationId(@Nonnull AllocationID 
allocationID) {
     
    --- End diff --
    
    In `TaskExecutorLocalStateStoresManager` I only found this 
`releaseLocalStateForAllocationId ()` which releases resources according to 
`allocationID`. Is there missing a function for releasing resources according 
to `jobID`? I think it should be used when a job is cancelled. 


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to