Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168514624
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
    @@ -20,35 +20,47 @@
     
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
    -import org.apache.flink.util.Preconditions;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     
     import java.io.File;
    -import java.util.HashMap;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
     
     /**
      * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
    - *
    - * TODO: this still still work in progress and partially still acts as a 
placeholder.
      */
     public class TaskExecutorLocalStateStoresManager {
     
    +   /** Logger for this class. */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    +
        /**
         * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
    -    * this.
    +    * this. Maps from allocation id to all the subtask's local state 
stores.
         */
    -   private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> 
taskStateStoresByJobID;
    +   private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStore>> taskStateStoresByAllocationID;
     
        /** This is the root directory for all local state of this task manager 
/ executor. */
        private final File[] localStateRootDirectories;
     
    -   public TaskExecutorLocalStateStoresManager(File[] 
localStateRootDirectories) {
    -           this.taskStateStoresByJobID = new HashMap<>();
    -           this.localStateRootDirectories = 
Preconditions.checkNotNull(localStateRootDirectories);
    +   /** Executor that runs the discarding of released state objects. */
    +   private final Executor discardExecutor;
    +
    +   public TaskExecutorLocalStateStoresManager(
    +           @Nonnull File[] localStateRootDirectories,
    +           @Nonnull Executor discardExecutor) {
    --- End diff --
    
    At least it is highlighting potential problems in the IDE and you can 
instruct the compiler to introduce runtime checks if you wish so (if you 
compile with Intellij this is the default).


---

Reply via email to