This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bd07c3dd43 Don't need to double synchronize on simple map operations 
(#14435)
bd07c3dd43 is described below

commit bd07c3dd43428608678e337c7935f237a64f533d
Author: George Shiqi Wu <[email protected]>
AuthorDate: Sat Jun 17 20:30:37 2023 -0400

    Don't need to double synchronize on simple map operations (#14435)
    
    * Don't need to double syncronize on simple map operations
    
    * remove lock
---
 .../druid/k8s/overlord/KubernetesTaskRunner.java   | 60 +++++++++-------------
 1 file changed, 24 insertions(+), 36 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index fe2f4be371..30bea70416 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -134,18 +134,16 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    synchronized (tasks) {
-      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> runTask(task))));
-      return tasks.get(task.getId()).getResult();
-    }
+    return tasks.computeIfAbsent(
+        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
runTask(task)))
+    ).getResult();
   }
 
   protected ListenableFuture<TaskStatus> joinAsync(Task task)
   {
-    synchronized (tasks) {
-      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> joinTask(task))));
-      return tasks.get(task.getId()).getResult();
-    }
+    return tasks.computeIfAbsent(
+        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
joinTask(task)))
+    ).getResult();
   }
 
   private TaskStatus runTask(Task task)
@@ -163,20 +161,18 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   {
     KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
 
-    synchronized (tasks) {
-      KubernetesWorkItem workItem = tasks.get(task.getId());
-
-      if (workItem == null) {
-        throw new ISE("Task [%s] disappeared", task.getId());
-      }
+    KubernetesWorkItem workItem = tasks.get(task.getId());
 
-      if (workItem.isShutdownRequested()) {
-        throw new ISE("Task [%s] has been shut down", task.getId());
-      }
+    if (workItem == null) {
+      throw new ISE("Task [%s] disappeared", task.getId());
+    }
 
-      workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    if (workItem.isShutdownRequested()) {
+      throw new ISE("Task [%s] has been shut down", task.getId());
     }
 
+    workItem.setKubernetesPeonLifecycle(peonLifecycle);
+
     try {
       TaskStatus taskStatus;
       if (run) {
@@ -202,9 +198,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     }
 
     finally {
-      synchronized (tasks) {
-        tasks.remove(task.getId());
-      }
+      tasks.remove(task.getId());
     }
   }
 
@@ -322,9 +316,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
   {
-    synchronized (tasks) {
-      return Lists.newArrayList(tasks.values());
-    }
+    return Lists.newArrayList(tasks.values());
   }
 
 
@@ -393,23 +385,19 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Collection<TaskRunnerWorkItem> getRunningTasks()
   {
-    synchronized (tasks) {
-      return tasks.values()
-          .stream()
-          .filter(KubernetesWorkItem::isRunning)
-          .collect(Collectors.toList());
-    }
+    return tasks.values()
+        .stream()
+        .filter(KubernetesWorkItem::isRunning)
+        .collect(Collectors.toList());
   }
 
   @Override
   public Collection<TaskRunnerWorkItem> getPendingTasks()
   {
-    synchronized (tasks) {
-      return tasks.values()
-          .stream()
-          .filter(KubernetesWorkItem::isPending)
-          .collect(Collectors.toList());
-    }
+    return tasks.values()
+        .stream()
+        .filter(KubernetesWorkItem::isPending)
+        .collect(Collectors.toList());
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to