Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168496761
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
    @@ -44,11 +44,21 @@
        private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> 
taskStateManagers;
     
        /** This is the root directory for all local state of this task manager 
/ executor. */
    -   private final File localStateRootDirectory;
    +   private final File[] localStateRootDirectories;
     
    -   public TaskExecutorLocalStateStoresManager(File 
localStateRootDirectory) {
    +   public TaskExecutorLocalStateStoresManager(File[] 
localStateRootDirectories) {
                this.taskStateManagers = new HashMap<>();
    -           this.localStateRootDirectory = 
Preconditions.checkNotNull(localStateRootDirectory);
    +           this.localStateRootDirectories = 
Preconditions.checkNotNull(localStateRootDirectories);
    +
    +           for (File localStateRecoveryRootDir : 
localStateRootDirectories) {
    +                   if (!localStateRecoveryRootDir.exists()) {
    +
    +                           if (!localStateRecoveryRootDir.mkdirs()) {
    +                                   throw new IllegalStateException("Could 
not create root directory for local recovery: " +
    --- End diff --
    
    👍 Good point


---

Reply via email to