This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 44abe2b96f Fix bug in k8s task runner in handling deleted jobs (#14001)
44abe2b96f is described below

commit 44abe2b96f97b5e0d4e630809ae318d32f7f3351
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Mar 30 00:39:52 2023 -0400

    Fix bug in k8s task runner in handling deleted jobs (#14001)
    
    With the KubernetesTaskRunner, if a task is manually shutdown via the web 
console while running or the corresponding k8s job is manually deleted, the 
thread responsible for overseeing the task gets stuck in a loop because the 
fabric8 client sends one event to it that the job is null when the job is 
deleted, but this doesn't pass the condition.
    
    This means that the thread is stuck waiting on a fabric8 event (the job 
being successful) that will never come up until maxTaskDuration (default 4 
hours). If a user of the extension is trying to use a limited taskqueue 
maxSize, this can cause problems as the k8s executor pool is unable to pick up 
additional tasks (since threads are stuck waiting on the old tasks that have 
already been deleted).
---
 .../overlord/common/DruidKubernetesPeonClient.java |  5 ++++-
 .../druid/k8s/overlord/common/JobResponse.java     |  2 +-
 .../common/DruidKubernetesPeonClientTest.java      | 26 ++++++++++++++++++----
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
index 226121d5a5..e60b3b6f8e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
@@ -106,10 +106,13 @@ public class DruidKubernetesPeonClient implements 
KubernetesPeonClient
                       .inNamespace(namespace)
                       .withName(taskId.getK8sTaskId())
                       .waitUntilCondition(
-                          x -> x != null && x.getStatus() != null && 
x.getStatus().getActive() == null,
+                          x -> (x == null) || (x.getStatus() != null && 
x.getStatus().getActive() == null),
                           howLong,
                           unit
                       );
+      if (job == null) {
+        return new JobResponse(job, PeonPhase.FAILED);
+      }
       if (job.getStatus().getSucceeded() != null) {
         return new JobResponse(job, PeonPhase.SUCCEEDED);
       }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
index 0148b53cce..6f39944951 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
@@ -55,7 +55,7 @@ public class JobResponse
   {
     Optional<Long> duration = Optional.absent();
     try {
-      if (job.getStatus() != null
+      if (job != null && job.getStatus() != null
           && job.getStatus().getStartTime() != null
           && job.getStatus().getCompletionTime() != null) {
         duration = Optional.of((long) new Period(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
index da77d4b61e..a10bcb526e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
@@ -30,8 +30,8 @@ import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Assertions;
@@ -60,9 +60,27 @@ public class DruidKubernetesPeonClientTest
     DruidKubernetesPeonClient client = new DruidKubernetesPeonClient(new 
TestKubernetesClient(this.client), "test",
                                                                      false
     );
-    Assertions.assertThrows(KubernetesClientTimeoutException.class, () -> {
-      client.waitForJobCompletion(new K8sTaskId("some-task"), 1, 
TimeUnit.SECONDS);
-    });
+    JobResponse jobResponse = client.waitForJobCompletion(new 
K8sTaskId("some-task"), 1, TimeUnit.SECONDS);
+    Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
+    Assertions.assertNull(jobResponse.getJob());
+  }
+
+  @Test
+  void testWaitingForAPodToGetReadySuccess()
+  {
+    DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new 
TestKubernetesClient(this.client), "test",
+        false
+    );
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName("sometask")
+        .endMetadata()
+        .withStatus(new 
JobStatusBuilder().withActive(null).withSucceeded(1).build())
+        .build();
+    client.batch().v1().jobs().inNamespace("test").create(job);
+    JobResponse jobResponse = peonClient.waitForJobCompletion(new 
K8sTaskId("sometask"), 1, TimeUnit.SECONDS);
+    Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase());
+    Assertions.assertEquals(job.getStatus().getSucceeded(), 
jobResponse.getJob().getStatus().getSucceeded());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to