rohangarg commented on code in PR #13476:
URL: https://github.com/apache/druid/pull/13476#discussion_r1045845169


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java:
##########
@@ -173,24 +184,32 @@ public Collection<TaskRunnerWorkItem> getKnownTasks()
   @GuardedBy("tasks")
   protected void saveRunningTasks()
   {
-    final File restoreFile = getRestoreFile();
-    final List<String> theTasks = new ArrayList<>();
+    final Map<File, List<String>> theTasks = new HashMap<>();
     for (TaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
-      theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
+      final String taskId = forkingTaskRunnerWorkItem.getTaskId();
+      final File restoreFile = getRestoreFile(taskId);
+      theTasks.computeIfAbsent(restoreFile, k -> new ArrayList<>())
+              .add(taskId);
     }
 
-    try {
-      Files.createParentDirs(restoreFile);
-      jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
-    }
-    catch (Exception e) {
-      LOG.warn(e, "Failed to save tasks to restore file[%s]. Skipping this 
save.", restoreFile);
+    for (Map.Entry<File, List<String>> entry : theTasks.entrySet()) {
+      final File restoreFile = entry.getKey();
+      final TaskRestoreInfo taskRestoreInfo = new 
TaskRestoreInfo(entry.getValue());
+      try {
+        Files.createParentDirs(restoreFile);
+        jsonMapper.writeValue(restoreFile, taskRestoreInfo);
+        LOG.info("Save restore file at [%s] for tasks [%s]",

Review Comment:
   I think logging this on every task start/complete would be too much - would 
be better if we only log in the exceptional case along with the task ids we're 
going to skip for this cycle.
   Also can the then running tasks be still saved in the next cycle incase one 
of the task finishes? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -292,6 +320,16 @@ public boolean isEncapsulatedTask()
     return encapsulatedTask;
   }
 
+  public synchronized void addTask(final String taskId, final File taskDir)
+  {
+    tracker.addTask(taskId, taskDir);
+  }
+
+  public synchronized void removeTask(final String taskId)

Review Comment:
   I think `addTask` is not really needed anymore since `getBaseTaskDir` should 
automatically do the `addTask` part. Probably the `removeTask` could be made as 
`stopTracking(taskId)` if we remove `addTask`



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -190,15 +207,31 @@ public String getBaseDir()
     return baseDir;
   }
 
+  @JsonProperty("baseTaskDir")
+  public String getBaseTaskDirPath()
+  {
+    return baseTaskDirPath;
+  }
+
   @JsonProperty
-  public File getBaseTaskDir()
+  public List<String> getBaseTaskDirPaths()
   {
-    return baseTaskDir;
+    return baseTaskDirPaths;
+  }
+
+  public List<File> getBaseTaskDirs()
+  {
+    return tracker.getBaseTaskDirs();
+  }
+
+  public synchronized File getBaseTaskDir(String taskId)

Review Comment:
   no need for synchronization on taskConfig object now - the tracker class 
should manage that



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java:
##########
@@ -74,38 +77,46 @@ public BaseRestorableTaskRunner(
   @Override
   public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
   {
-    final File restoreFile = getRestoreFile();
-    final TaskRestoreInfo taskRestoreInfo;
-    if (restoreFile.exists()) {
-      try {
-        taskRestoreInfo = jsonMapper.readValue(restoreFile, 
TaskRestoreInfo.class);
-      }
-      catch (Exception e) {
-        LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping 
restore.", restoreFile);
-        return ImmutableList.of();
+    final Map<File, TaskRestoreInfo> taskRestoreInfos = new HashMap<>();
+    for (File baseDir : taskConfig.getBaseTaskDirs()) {
+      File restoreFile = new File(baseDir, TASK_RESTORE_FILENAME);
+      if (restoreFile.exists()) {
+        try {
+          taskRestoreInfos.put(baseDir, jsonMapper.readValue(restoreFile, 
TaskRestoreInfo.class));
+        }
+        catch (Exception e) {
+          LOG.error(e, "Failed to read restorable tasks from file[%s]. 
Skipping restore.", restoreFile);
+        }
       }
-    } else {
-      return ImmutableList.of();
     }
 
     final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new 
ArrayList<>();
-    for (final String taskId : taskRestoreInfo.getRunningTasks()) {
-      try {
-        final File taskFile = new File(taskConfig.getTaskDir(taskId), 
"task.json");
-        final Task task = jsonMapper.readValue(taskFile, Task.class);
-
-        if (!task.getId().equals(taskId)) {
-          throw new ISE("Task[%s] restore file had wrong id[%s]", taskId, 
task.getId());
+    for (Map.Entry<File, TaskRestoreInfo> entry : taskRestoreInfos.entrySet()) 
{
+      final File baseDir = entry.getKey();
+      final TaskRestoreInfo taskRestoreInfo = entry.getValue();
+      for (final String taskId : taskRestoreInfo.getRunningTasks()) {
+        try {
+          taskConfig.addTask(taskId, baseDir);

Review Comment:
   why are you explicitly adding the task? shouldn't `getTaskDir` do it 
implicitly?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -300,4 +338,50 @@ private String defaultDir(@Nullable String 
configParameter, final String default
 
     return configParameter;
   }
+
+  private static class TaskStorageDirTracker
+  {
+    private int taskDirIndex = 0;
+
+    private final List<File> baseTaskDirs = new ArrayList<>();
+
+    private final Map<String, File> taskToTempDirMap = new HashMap<>();
+
+    TaskStorageDirTracker(final List<String> baseTaskDirPaths)
+    {
+      for (String baseTaskDirPath : baseTaskDirPaths) {
+        baseTaskDirs.add(new File(baseTaskDirPath));
+      }
+    }
+
+    List<File> getBaseTaskDirs()
+    {
+      return baseTaskDirs;
+    }
+
+    synchronized File getOrSelectTaskDir(final String taskId)
+    {
+      if (!taskToTempDirMap.containsKey(taskId)) {
+        addTask(taskId, baseTaskDirs.get(taskDirIndex));
+        taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
+      }
+
+      return taskToTempDirMap.get(taskId);
+    }
+
+    synchronized void addTask(final String taskId, final File taskDir)

Review Comment:
   please make this private so that it is clear that only way of adding a task 
to the tracker currently is via `getOrSelectTaskDir`



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java:
##########
@@ -103,11 +112,30 @@ public KubernetesTaskRunnerTest()
         new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
+    this.useMultipleBaseTaskDirPaths = useMultipleBaseTaskDirPaths;
   }
 
-  @BeforeEach
-  void setUp()
+  @Parameterized.Parameters(name = "useMultipleBaseTaskDirPaths = {0}")
+  public static Collection<Object[]> getParameters()
   {
+    Object[][] parameters = new Object[][]{
+        {false},
+        {true}
+    };
+
+    return Arrays.asList(parameters);
+  }
+
+  @Before
+  public void setUp()
+  {
+    List<String> baseTaskDirPaths = null;
+    if (useMultipleBaseTaskDirPaths) {
+      baseTaskDirPaths = ImmutableList.of(
+          FileUtils.createTempDir().toString(),

Review Comment:
   better to use `TemporaryFolder` in the test class and use that to create 
temp directory for test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to