This is an automated email from the ASF dual-hosted git repository.

davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b0de61d1 Move some lifecycle management from doTask -> shutdown for 
the mm-less task runner (#14895)
95b0de61d1 is described below

commit 95b0de61d172a323aead25571b8b83b4ae0b7120
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Aug 25 12:50:38 2023 -0400

    Move some lifecycle management from doTask -> shutdown for the mm-less task 
runner (#14895)
    
    * save work
    
    * Add syncronized
    
    * Don't shutdown in run
    
    * Adding unit tests
    
    * Cleanup lifecycle
    
    * Fix tests
    
    * remove newline
---
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  9 ++--
 .../druid/k8s/overlord/KubernetesTaskRunner.java   | 24 +++++----
 .../druid/k8s/overlord/KubernetesWorkItem.java     |  9 ----
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  | 14 ++---
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 62 +++++++++++-----------
 .../druid/k8s/overlord/KubernetesWorkItemTest.java |  2 -
 6 files changed, 52 insertions(+), 68 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index f6b15f46bc..4814d8cbb6 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -137,7 +137,6 @@ public class KubernetesPeonLifecycle
     }
     catch (Exception e) {
       log.info("Failed to run task: %s", taskId.getOriginalTaskId());
-      shutdown();
       throw e;
     }
     finally {
@@ -168,10 +167,9 @@ public class KubernetesPeonLifecycle
     finally {
       try {
         saveLogs();
-        shutdown();
       }
       catch (Exception e) {
-        log.warn(e, "Task [%s] cleanup failed", taskId);
+        log.warn(e, "Log processing failed for task [%s]", taskId);
       }
 
       stopTask();
@@ -188,7 +186,7 @@ public class KubernetesPeonLifecycle
    */
   protected void shutdown()
   {
-    if (State.PENDING.equals(state.get()) || 
State.RUNNING.equals(state.get())) {
+    if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get()) 
|| State.STOPPED.equals(state.get())) {
       kubernetesClient.deletePeonJob(taskId);
     }
   }
@@ -223,7 +221,7 @@ public class KubernetesPeonLifecycle
    */
   protected TaskLocation getTaskLocation()
   {
-    if (!State.RUNNING.equals(state.get())) {
+    if (State.PENDING.equals(state.get()) || 
State.NOT_STARTED.equals(state.get())) {
       log.debug("Can't get task location for non-running job. [%s]", 
taskId.getOriginalTaskId());
       return TaskLocation.unknown();
     }
@@ -251,7 +249,6 @@ public class KubernetesPeonLifecycle
           
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
 "false")),
           pod.getMetadata() != null ? pod.getMetadata().getName() : ""
       );
-      log.info("K8s task %s is running at location %s", 
taskId.getOriginalTaskId(), taskLocation);
     }
 
     return taskLocation;
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 9a4e4bcca6..24e21b0b4e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -182,10 +182,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         KubernetesWorkItem workItem = tasks.get(task.getId());
 
         if (workItem == null) {
-          throw new ISE("Task [%s] disappeared", task.getId());
-        }
-
-        if (workItem.isShutdownRequested()) {
           throw new ISE("Task [%s] has been shut down", task.getId());
         }
 
@@ -213,11 +209,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
       log.error(e, "Task [%s] execution caught an exception", task.getId());
       throw new RuntimeException(e);
     }
-    finally {
-      synchronized (tasks) {
-        tasks.remove(task.getId());
-      }
-    }
   }
 
   @VisibleForTesting
@@ -271,6 +262,10 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
       return;
     }
 
+    synchronized (tasks) {
+      tasks.remove(taskid);
+    }
+
     workItem.shutdown();
   }
 
@@ -440,6 +435,17 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
                 .collect(Collectors.toList());
   }
 
+  @Override
+  public TaskLocation getTaskLocation(String taskId)
+  {
+    final KubernetesWorkItem workItem = tasks.get(taskId);
+    if (workItem == null) {
+      return TaskLocation.unknown();
+    } else {
+      return workItem.getLocation();
+    }
+  }
+
   @Nullable
   @Override
   public RunnerTaskState getRunnerTaskState(String taskId)
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 164a82b6ae..94d4bbb67f 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -30,13 +30,10 @@ import 
org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.java.util.common.ISE;
 
 import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class KubernetesWorkItem extends TaskRunnerWorkItem
 {
   private final Task task;
-
-  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
   private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
 
   public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> 
statusFuture)
@@ -53,7 +50,6 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
 
   protected synchronized void shutdown()
   {
-    this.shutdownRequested.set(true);
 
     if (this.kubernetesPeonLifecycle != null) {
       this.kubernetesPeonLifecycle.startWatchingLogs();
@@ -61,11 +57,6 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
     }
   }
 
-  protected boolean isShutdownRequested()
-  {
-    return shutdownRequested.get();
-  }
-
   protected boolean isPending()
   {
     return RunnerTaskState.PENDING.equals(getRunnerTaskState());
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 3a017e5f74..084d1db62d 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -198,9 +198,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(null);
-    EasyMock.expect(kubernetesClient.deletePeonJob(
-        new K8sTaskId(ID)
-    )).andReturn(true);
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
     stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
     EasyMock.expectLastCall().once();
@@ -245,7 +242,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     replayAll();
 
@@ -298,7 +294,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -353,7 +348,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -408,7 +402,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -459,7 +452,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -512,7 +504,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -554,8 +545,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     logWatch.close();
     EasyMock.expectLastCall();
 
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -908,8 +897,11 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.STOPPED);
+    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
 
+    replayAll();
     Assert.assertEquals(TaskLocation.unknown(), 
peonLifecycle.getTaskLocation());
+    verifyAll();
   }
 
   private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, 
KubernetesPeonLifecycle.State state)
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index ca1fc64171..bee3a533c7 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -152,8 +152,6 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assert.assertEquals(taskStatus, future.get());
 
     verifyAll();
-
-    Assert.assertFalse(runner.tasks.containsKey(task.getId()));
   }
 
   @Test
@@ -191,8 +189,6 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assert.assertTrue(e.getCause() instanceof RuntimeException);
 
     verifyAll();
-
-    Assert.assertFalse(runner.tasks.containsKey(task.getId()));
   }
 
   @Test
@@ -208,8 +204,6 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assert.assertEquals(taskStatus, future.get());
 
     verifyAll();
-
-    Assert.assertFalse(runner.tasks.containsKey(task.getId()));
   }
 
   @Test
@@ -236,28 +230,11 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assert.assertTrue(e.getCause() instanceof RuntimeException);
 
     verifyAll();
-
-    Assert.assertFalse(runner.tasks.containsKey(task.getId()));
-  }
-
-  @Test
-  public void test_doTask_withoutWorkItem_throwsRuntimeException()
-  {
-    Assert.assertThrows(
-        "Task [id] disappeared",
-        RuntimeException.class,
-        () -> runner.doTask(task, true)
-    );
   }
 
   @Test
   public void test_doTask_whenShutdownRequested_throwsRuntimeException()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
-    workItem.shutdown();
-
-    runner.tasks.put(task.getId(), workItem);
-
     Assert.assertThrows(
         "Task [id] has been shut down",
         RuntimeException.class,
@@ -266,13 +243,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_shutdown_withoutExistingTask()
-  {
-    runner.shutdown(task.getId(), "");
-  }
-
-  @Test
-  public void test_shutdown_withExistingTask()
+  public void test_shutdown_withExistingTask_removesTaskFromMap()
   {
     KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
       @Override
@@ -282,7 +253,13 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     };
 
     runner.tasks.put(task.getId(), workItem);
+    runner.shutdown(task.getId(), "");
+    Assert.assertTrue(runner.tasks.isEmpty());
+  }
 
+  @Test
+  public void test_shutdown_withoutExistingTask()
+  {
     runner.shutdown(task.getId(), "");
   }
 
@@ -629,6 +606,30 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void test_getTaskLocation_withExistingTask()
+  {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+      @Override
+      public TaskLocation getLocation()
+      {
+        return TaskLocation.create("host", 0, 1, false);
+      }
+    };
+
+    runner.tasks.put(task.getId(), workItem);
+
+    TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+    Assert.assertEquals(TaskLocation.create("host", 0, 1, false), 
taskLocation);
+  }
+
+  @Test
+  public void test_getTaskLocation_noTaskFound()
+  {
+    TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+    Assert.assertEquals(TaskLocation.unknown(), taskLocation);
+  }
+
   @Test
   public void test_getTotalCapacity()
   {
@@ -644,6 +645,5 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assert.assertEquals(1, runner.getUsedCapacity());
     runner.tasks.remove(task.getId());
     Assert.assertEquals(0, runner.getUsedCapacity());
-
   }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index 5f95177048..d5cf2ea725 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -75,7 +75,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   public void test_shutdown_withoutKubernetesPeonLifecycle()
   {
     workItem.shutdown();
-    Assert.assertTrue(workItem.isShutdownRequested());
   }
 
   @Test
@@ -91,7 +90,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
 
     workItem.shutdown();
     verifyAll();
-    Assert.assertTrue(workItem.isShutdownRequested());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to