kfaraz commented on code in PR #14643:
URL: https://github.com/apache/druid/pull/14643#discussion_r1274630737


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -134,46 +134,37 @@ public Optional<InputStream> streamTaskLog(String taskid, 
long offset)
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
runTask(task)))
-    ).getResult();
+    return runOrJoinTask(task, true);
   }
 
-  protected ListenableFuture<TaskStatus> joinAsync(Task task)
+  protected ListenableFuture<TaskStatus> runOrJoinTask(Task task, boolean run)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
joinTask(task)))
-    ).getResult();
-  }
-
-  private TaskStatus runTask(Task task)
-  {
-    return doTask(task, true);
-  }
-
-  private TaskStatus joinTask(Task task)
-  {
-    return doTask(task, false);
+    synchronized (tasks) {
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> doTask(task, run))));
+      return tasks.get(task.getId()).getResult();

Review Comment:
   this can be a single chained statement. Also it would look more readable if 
the arguments to `computeIfAbsent` were on a different line as in the original 
method.



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -269,17 +260,17 @@ public Optional<InputStream> streamTaskReports(String 
taskid) throws IOException
   @Override
   public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
   {
-    List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = new ArrayList<>();
+    List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new 
ArrayList<>();
     for (Job job : client.getPeonJobs()) {
       try {
         Task task = adapter.toTask(job);
-        tasks.add(Pair.of(task, joinAsync(task)));
+        restoredTasks.add(Pair.of(task, runOrJoinTask(task, false)));

Review Comment:
   I preferred the original separation between `joinTask` and `runTask`. 
Passing a boolean is cryptic and makes the code less readable.



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -134,46 +134,37 @@ public Optional<InputStream> streamTaskLog(String taskid, 
long offset)
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
runTask(task)))
-    ).getResult();
+    return runOrJoinTask(task, true);
   }
 
-  protected ListenableFuture<TaskStatus> joinAsync(Task task)
+  protected ListenableFuture<TaskStatus> runOrJoinTask(Task task, boolean run)

Review Comment:
   It was better to have this as two separate methods, seemed more readable and 
easy to understand.



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -134,46 +134,37 @@ public Optional<InputStream> streamTaskLog(String taskid, 
long offset)
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
runTask(task)))
-    ).getResult();
+    return runOrJoinTask(task, true);
   }
 
-  protected ListenableFuture<TaskStatus> joinAsync(Task task)
+  protected ListenableFuture<TaskStatus> runOrJoinTask(Task task, boolean run)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
joinTask(task)))
-    ).getResult();
-  }
-
-  private TaskStatus runTask(Task task)
-  {
-    return doTask(task, true);
-  }
-
-  private TaskStatus joinTask(Task task)
-  {
-    return doTask(task, false);
+    synchronized (tasks) {
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> doTask(task, run))));
+      return tasks.get(task.getId()).getResult();
+    }
   }
 
   @VisibleForTesting
   protected TaskStatus doTask(Task task, boolean run)
   {
-    KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+    try {
+      KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
 
-    KubernetesWorkItem workItem = tasks.get(task.getId());
+      synchronized (tasks) {
+        KubernetesWorkItem workItem = tasks.get(task.getId());
 
-    if (workItem == null) {
-      throw new ISE("Task [%s] disappeared", task.getId());
-    }
+        if (workItem == null) {
+          throw new ISE("Task [%s] disappeared", task.getId());

Review Comment:
   Should we maybe just return a failed `TaskStatus` here instead of throwing 
an exception? The exception thrown by this method may or may not be handled by 
the calling code, but no point depending on that if we already know that the 
reason for the task failure.
   
   We should do the same thing in the catch block too.
   
   But this doesn't need to be done as a part of this PR, just wanted to call 
it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to