Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168480072
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -55,25 +54,24 @@
/** */
private final Map<Long, TaskStateSnapshot>
storedTaskStateByCheckpointID;
- /** This is the base directory for all local state of the subtask that
owns this {@link TaskLocalStateStore}. */
- private final File subtaskLocalStateBaseDirectory;
+ /** The root directories for all local state of this {@link
TaskLocalStateStore}. */
+ private final File[] localStateRootDirectories;
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
int subtaskIndex,
- File localStateRootDir) {
+ File[] localStateRootDirectories) {
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
this.storedTaskStateByCheckpointID = new HashMap<>();
- this.subtaskLocalStateBaseDirectory =
- new File(localStateRootDir, createSubtaskPath(jobID,
jobVertexID, subtaskIndex));
+ this.localStateRootDirectories = localStateRootDirectories;
}
- static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID,
int subtaskIndex) {
- return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" +
subtaskIndex;
+ protected String createSubtaskPath() {
+ return jobID + File.separator + jobVertexID + File.separator +
subtaskIndex;
--- End diff --
Maybe using `Paths.get` for that?
---