This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b88b78ba4 Fix race condition in KubernetesTaskRunner when task is 
added to the map (#14643)
9b88b78ba4 is described below

commit 9b88b78ba44e3de64dfa9978e95e3ee3cda221b6
Author: YongGang <[email protected]>
AuthorDate: Thu Jul 27 00:04:36 2023 -0700

    Fix race condition in KubernetesTaskRunner when task is added to the map 
(#14643)
    
    Changes:
    - Fix race condition in KubernetesTaskRunner introduced by #14435
    - Perform addition and removal from map inside a synchronized block
    - Update tests
---
 .../druid/k8s/overlord/KubernetesTaskRunner.java   | 61 ++++++++++++----------
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |  9 ++--
 2 files changed, 36 insertions(+), 34 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 30bea70416..12c5bb6029 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -134,16 +134,18 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
runTask(task)))
-    ).getResult();
+    synchronized (tasks) {
+      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
+                  .getResult();
+    }
   }
 
   protected ListenableFuture<TaskStatus> joinAsync(Task task)
   {
-    return tasks.computeIfAbsent(
-        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> 
joinTask(task)))
-    ).getResult();
+    synchronized (tasks) {
+      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
+                  .getResult();
+    }
   }
 
   private TaskStatus runTask(Task task)
@@ -159,21 +161,23 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @VisibleForTesting
   protected TaskStatus doTask(Task task, boolean run)
   {
-    KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+    try {
+      KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
 
-    KubernetesWorkItem workItem = tasks.get(task.getId());
+      synchronized (tasks) {
+        KubernetesWorkItem workItem = tasks.get(task.getId());
 
-    if (workItem == null) {
-      throw new ISE("Task [%s] disappeared", task.getId());
-    }
+        if (workItem == null) {
+          throw new ISE("Task [%s] disappeared", task.getId());
+        }
 
-    if (workItem.isShutdownRequested()) {
-      throw new ISE("Task [%s] has been shut down", task.getId());
-    }
+        if (workItem.isShutdownRequested()) {
+          throw new ISE("Task [%s] has been shut down", task.getId());
+        }
 
-    workItem.setKubernetesPeonLifecycle(peonLifecycle);
+        workItem.setKubernetesPeonLifecycle(peonLifecycle);
+      }
 
-    try {
       TaskStatus taskStatus;
       if (run) {
         taskStatus = peonLifecycle.run(
@@ -191,14 +195,14 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
 
       return taskStatus;
     }
-
     catch (Exception e) {
       log.error(e, "Task [%s] execution caught an exception", task.getId());
       throw new RuntimeException(e);
     }
-
     finally {
-      tasks.remove(task.getId());
+      synchronized (tasks) {
+        tasks.remove(task.getId());
+      }
     }
   }
 
@@ -269,17 +273,17 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
   {
-    List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = new ArrayList<>();
+    List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new 
ArrayList<>();
     for (Job job : client.getPeonJobs()) {
       try {
         Task task = adapter.toTask(job);
-        tasks.add(Pair.of(task, joinAsync(task)));
+        restoredTasks.add(Pair.of(task, joinAsync(task)));
       }
       catch (IOException e) {
         log.error(e, "Error deserializing task from job [%s]", 
job.getMetadata().getName());
       }
     }
-    return tasks;
+    return restoredTasks;
   }
 
   @Override
@@ -319,7 +323,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     return Lists.newArrayList(tasks.values());
   }
 
-
   @Override
   public Optional<ScalingStats> getScalingStats()
   {
@@ -386,18 +389,18 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   public Collection<TaskRunnerWorkItem> getRunningTasks()
   {
     return tasks.values()
-        .stream()
-        .filter(KubernetesWorkItem::isRunning)
-        .collect(Collectors.toList());
+                .stream()
+                .filter(KubernetesWorkItem::isRunning)
+                .collect(Collectors.toList());
   }
 
   @Override
   public Collection<TaskRunnerWorkItem> getPendingTasks()
   {
     return tasks.values()
-        .stream()
-        .filter(KubernetesWorkItem::isPending)
-        .collect(Collectors.toList());
+                .stream()
+                .filter(KubernetesWorkItem::isPending)
+                .collect(Collectors.toList());
   }
 
   @Nullable
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 4a3dd322a7..0ef053a619 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -32,7 +32,6 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
@@ -237,17 +236,17 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_doTask_withoutWorkItem_throwsISE()
+  public void test_doTask_withoutWorkItem_throwsRuntimeException()
   {
     Assert.assertThrows(
         "Task [id] disappeared",
-        ISE.class,
+        RuntimeException.class,
         () -> runner.doTask(task, true)
     );
   }
 
   @Test
-  public void test_doTask_whenShutdownRequested_throwsISE()
+  public void test_doTask_whenShutdownRequested_throwsRuntimeException()
   {
     KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
     workItem.shutdown();
@@ -256,7 +255,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     Assert.assertThrows(
         "Task [id] has been shut down",
-        ISE.class,
+        RuntimeException.class,
         () -> runner.doTask(task, true)
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to