Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168480072
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
    @@ -55,25 +54,24 @@
        /** */
        private final Map<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
     
    -   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
    -   private final File subtaskLocalStateBaseDirectory;
    +   /** The root directories for all local state of this {@link 
TaskLocalStateStore}. */
    +   private final File[] localStateRootDirectories;
     
        public TaskLocalStateStore(
                JobID jobID,
                JobVertexID jobVertexID,
                int subtaskIndex,
    -           File localStateRootDir) {
    +           File[] localStateRootDirectories) {
     
                this.jobID = jobID;
                this.jobVertexID = jobVertexID;
                this.subtaskIndex = subtaskIndex;
                this.storedTaskStateByCheckpointID = new HashMap<>();
    -           this.subtaskLocalStateBaseDirectory =
    -                   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
    +           this.localStateRootDirectories = localStateRootDirectories;
        }
     
    -   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
    -           return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
    +   protected String createSubtaskPath() {
    +           return jobID + File.separator + jobVertexID + File.separator + 
subtaskIndex;
    --- End diff --
    
    Maybe using `Paths.get` for that?


---

Reply via email to