This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 130bfbfc6d7 Revert "Separate task lifecycle from kubernetes/location 
lifecycle (#15133)" (#15346)
130bfbfc6d7 is described below

commit 130bfbfc6d7d30b6ee2efab90aea415245d9313e
Author: George Shiqi Wu <[email protected]>
AuthorDate: Wed Nov 8 13:12:30 2023 -0500

    Revert "Separate task lifecycle from kubernetes/location lifecycle 
(#15133)" (#15346)
    
    This reverts commit dc0b163e192545c802b7fe2b3271e035cc1e70ff.
---
 .../overlord/KubernetesAndWorkerTaskRunner.java    |   6 +
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  25 +---
 .../overlord/KubernetesPeonLifecycleFactory.java   |  10 +-
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  47 +++----
 .../k8s/overlord/KubernetesTaskRunnerConfig.java   |   2 +-
 .../druid/k8s/overlord/KubernetesWorkItem.java     |  23 +---
 .../druid/k8s/overlord/PeonLifecycleFactory.java   |   7 +-
 .../druid/k8s/overlord/common/JobResponse.java     |   9 +-
 .../k8s/overlord/common/KubernetesPeonClient.java  |   6 +-
 .../druid/k8s/overlord/common/PeonPhase.java       |  62 +++++++++
 .../KubernetesAndWorkerTaskRunnerTest.java         |   9 ++
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  | 140 ++++++---------------
 .../overlord/KubernetesTaskRunnerConfigTest.java   |   4 +-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 102 ++-------------
 .../druid/k8s/overlord/KubernetesWorkItemTest.java |  11 +-
 .../k8s/overlord/TestPeonLifecycleFactory.java     |   7 +-
 .../druid/k8s/overlord/common/JobResponseTest.java |   8 +-
 .../overlord/common/KubernetesPeonClientTest.java  |   3 +
 .../druid/k8s/overlord/common/PeonPhaseTest.java}  |  27 ++--
 .../DruidPeonClientIntegrationTest.java            |   5 +-
 .../test/resources/expectedEphemeralOutput.yaml    |   2 +-
 .../resources/expectedMultiContainerOutput.yaml    |   2 +-
 .../expectedMultiContainerOutputOrder.yaml         |   2 +-
 .../src/test/resources/expectedNoopJob.yaml        |   2 +-
 .../src/test/resources/expectedNoopJobLongIds.yaml |   2 +-
 .../test/resources/expectedNoopJobNoTaskJson.yaml  |   2 +-
 .../test/resources/expectedNoopJobTlsEnabled.yaml  |   2 +-
 .../src/test/resources/expectedPodSpec.yaml        |   2 +-
 .../resources/expectedSingleContainerOutput.yaml   |   2 +-
 .../common/actions/UpdateLocationAction.java       |  12 +-
 .../druid/indexing/common/task/AbstractTask.java   |  10 ++
 .../apache/druid/indexing/overlord/TaskRunner.java |   5 +
 .../common/actions/UpdateLocationActionTest.java   |  71 +++++++++++
 .../indexing/common/task/AbstractTaskTest.java     |   2 +-
 34 files changed, 299 insertions(+), 332 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index d8f4e9d84f2..2c45a0ec7b8 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -285,4 +285,10 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
   {
     kubernetesTaskRunner.updateStatus(task, status);
   }
+
+  @Override
+  public void updateLocation(Task task, TaskLocation location)
+  {
+    kubernetesTaskRunner.updateLocation(task, location);
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index ea3c2a19d1c..5c6c7c6b3eb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -31,9 +31,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.indexing.overlord.TaskRunnerUtils;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
@@ -50,8 +47,6 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -94,8 +89,6 @@ public class KubernetesPeonLifecycle
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
   private final TaskStateListener stateListener;
-  private final List<Pair<TaskRunnerListener, Executor>> listeners;
-
   @MonotonicNonNull
   private LogWatch logWatch;
 
@@ -106,8 +99,7 @@ public class KubernetesPeonLifecycle
       KubernetesPeonClient kubernetesClient,
       TaskLogs taskLogs,
       ObjectMapper mapper,
-      TaskStateListener stateListener,
-      List<Pair<TaskRunnerListener, Executor>> listeners
+      TaskStateListener stateListener
   )
   {
     this.taskId = new K8sTaskId(task);
@@ -116,7 +108,6 @@ public class KubernetesPeonLifecycle
     this.taskLogs = taskLogs;
     this.mapper = mapper;
     this.stateListener = stateListener;
-    this.listeners = listeners;
   }
 
   /**
@@ -187,11 +178,7 @@ public class KubernetesPeonLifecycle
   {
     try {
       updateState(new State[]{State.NOT_STARTED, State.PENDING}, 
State.RUNNING);
-      TaskRunnerUtils.notifyLocationChanged(
-          listeners,
-          task.getId(),
-          getTaskLocation()
-      );
+
       JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
           taskId,
           timeout,
@@ -203,14 +190,12 @@ public class KubernetesPeonLifecycle
     finally {
       try {
         saveLogs();
-        shutdown();
       }
       catch (Exception e) {
-        log.warn(e, "Cleanup failed for task [%s]", taskId);
-      }
-      finally {
-        stopTask();
+        log.warn(e, "Log processing failed for task [%s]", taskId);
       }
+
+      stopTask();
     }
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
index 2998f3fc921..bf4e3a71257 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
@@ -21,14 +21,9 @@ package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.tasklogs.TaskLogs;
 
-import java.util.List;
-import java.util.concurrent.Executor;
-
 public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
 {
   private final KubernetesPeonClient client;
@@ -47,15 +42,14 @@ public class KubernetesPeonLifecycleFactory implements 
PeonLifecycleFactory
   }
 
   @Override
-  public KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener, 
List<Pair<TaskRunnerListener, Executor>> listeners)
+  public KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener)
   {
     return new KubernetesPeonLifecycle(
         task,
         client,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
   }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 56de37c3798..a0a29dcbbb9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -146,20 +146,16 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      return tasks.computeIfAbsent(task.getId(), k -> {
-        ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
-        return new KubernetesWorkItem(task);
-      }).getResult();
+      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
+                  .getResult();
     }
   }
 
   protected ListenableFuture<TaskStatus> joinAsync(Task task)
   {
     synchronized (tasks) {
-      return tasks.computeIfAbsent(task.getId(), k -> {
-        ListenableFuture<TaskStatus> unused = exec.submit(() -> 
joinTask(task));
-        return new KubernetesWorkItem(task);
-      }).getResult();
+      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
+                  .getResult();
     }
   }
 
@@ -176,12 +172,10 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @VisibleForTesting
   protected TaskStatus doTask(Task task, boolean run)
   {
-    TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution 
never started");
     try {
       KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
           task,
-          this::emitTaskStateMetrics,
-          listeners
+          this::emitTaskStateMetrics
       );
 
       synchronized (tasks) {
@@ -194,6 +188,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         workItem.setKubernetesPeonLifecycle(peonLifecycle);
       }
 
+      TaskStatus taskStatus;
       if (run) {
         taskStatus = peonLifecycle.run(
             adapter.fromTask(task),
@@ -206,17 +201,15 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
             config.getTaskTimeout().toStandardDuration().getMillis()
         );
       }
+
+      updateStatus(task, taskStatus);
+
       return taskStatus;
     }
     catch (Exception e) {
       log.error(e, "Task [%s] execution caught an exception", task.getId());
-      taskStatus = TaskStatus.failure(task.getId(), "Could not start task 
execution");
       throw new RuntimeException(e);
     }
-    finally {
-      updateStatus(task, taskStatus);
-      TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), 
TaskLocation.unknown());
-    }
   }
 
   @VisibleForTesting
@@ -249,15 +242,15 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public void updateStatus(Task task, TaskStatus status)
   {
-    KubernetesWorkItem workItem = tasks.get(task.getId());
-    if (workItem != null && !workItem.getResult().isDone() && 
status.isComplete()) {
-      workItem.setResult(status);
-    }
-
-    // Notify listeners even if the result is set to handle the shutdown case.
     TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
   }
 
+  @Override
+  public void updateLocation(Task task, TaskLocation location)
+  {
+    TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
+  }
+
   @Override
   public void shutdown(String taskid, String reason)
   {
@@ -424,16 +417,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, 
executor);
     log.debug("Registered listener [%s]", listener.getListenerId());
     listeners.add(listenerPair);
-
-    for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
-      if (entry.getValue().isRunning()) {
-        TaskRunnerUtils.notifyLocationChanged(
-            ImmutableList.of(listenerPair),
-            entry.getKey(),
-            entry.getValue().getLocation()
-        );
-      }
-    }
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 5fdcd9cfbb7..0d67c55b30a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
   @JsonProperty
   @NotNull
   // how long to wait for the jobs to be cleaned up.
-  private Period taskCleanupDelay = new Period("PT1H");
+  private Period taskCleanupDelay = new Period("P2D");
 
   @JsonProperty
   @NotNull
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index b089b4dd2db..94d4bbb67f6 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -19,10 +19,9 @@
 
 package org.apache.druid.k8s.overlord;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -37,18 +36,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
   private final Task task;
   private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
 
-  private final SettableFuture<TaskStatus> result;
-
-  public KubernetesWorkItem(Task task)
-  {
-    this(task, SettableFuture.create());
-  }
-
-  @VisibleForTesting
-  public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
+  public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> 
statusFuture)
   {
-    super(task.getId(), result);
-    this.result = result;
+    super(task.getId(), statusFuture);
     this.task = task;
   }
 
@@ -61,7 +51,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
   protected synchronized void shutdown()
   {
 
-    if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
+    if (this.kubernetesPeonLifecycle != null) {
       this.kubernetesPeonLifecycle.startWatchingLogs();
       this.kubernetesPeonLifecycle.shutdown();
     }
@@ -129,9 +119,4 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
   {
     return task;
   }
-
-  public void setResult(TaskStatus status)
-  {
-    result.set(status);
-  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
index 2b180fb9dac..2a234ebc578 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
@@ -20,13 +20,8 @@
 package org.apache.druid.k8s.overlord;
 
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
-
-import java.util.List;
-import java.util.concurrent.Executor;
 
 public interface PeonLifecycleFactory
 {
-  KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener, 
List<Pair<TaskRunnerListener, Executor>> listeners);
+  KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener);
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
index 26fe5b98f0b..a7a8156468f 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
@@ -32,10 +32,12 @@ public class JobResponse
   private static final EmittingLogger LOGGER = new 
EmittingLogger(JobResponse.class);
 
   private final Job job;
+  private final PeonPhase phase;
 
-  public JobResponse(@Nullable Job job)
+  public JobResponse(@Nullable Job job, PeonPhase phase)
   {
     this.job = job;
+    this.phase = phase;
   }
 
   public Job getJob()
@@ -43,6 +45,11 @@ public class JobResponse
     return job;
   }
 
+  public PeonPhase getPhase()
+  {
+    return phase;
+  }
+
   public long getJobDuration()
   {
     long duration = -1L;
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 147ea732d0c..9fdc25fa645 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -101,13 +101,13 @@ public class KubernetesPeonClient
                       );
       if (job == null) {
         log.info("K8s job for the task [%s] was not found. It can happen if 
the task was canceled", taskId);
-        return new JobResponse(null);
+        return new JobResponse(null, PeonPhase.FAILED);
       }
       if (job.getStatus().getSucceeded() != null) {
-        return new JobResponse(job);
+        return new JobResponse(job, PeonPhase.SUCCEEDED);
       }
       log.warn("Task %s failed with status %s", taskId, job.getStatus());
-      return new JobResponse(job);
+      return new JobResponse(job, PeonPhase.FAILED);
     });
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
new file mode 100644
index 00000000000..6efcd34872b
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.api.model.Pod;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum PeonPhase
+{
+  PENDING("Pending"),
+  SUCCEEDED("Succeeded"),
+  FAILED("Failed"),
+  UNKNOWN("Unknown"),
+  RUNNING("Running");
+
+  private static final Map<String, PeonPhase> PHASE_MAP = 
Arrays.stream(PeonPhase.values())
+                                                                
.collect(Collectors.toMap(
+                                                                    
PeonPhase::getPhase,
+                                                                    
Function.identity()
+                                                                ));
+  private final String phase;
+
+  PeonPhase(String phase)
+  {
+    this.phase = phase;
+  }
+
+  public String getPhase()
+  {
+    return phase;
+  }
+
+  public static PeonPhase getPhaseFor(Pod pod)
+  {
+    if (pod == null) {
+      return UNKNOWN;
+    }
+    return PHASE_MAP.get(pod.getStatus().getPhase());
+  }
+
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 40ca6fc2f2b..3ab515cc6e5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -362,4 +362,13 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     runner.updateStatus(task, TaskStatus.running(ID));
     verifyAll();
   }
+
+  @Test
+  public void test_updateLocation()
+  {
+    kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
+    replayAll();
+    runner.updateLocation(task, TaskLocation.unknown());
+    verifyAll();
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index e984e449b28..1c6e429a3dc 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -32,12 +31,11 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.k8s.overlord.common.JobResponse;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.tasklogs.TaskLogs;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -52,8 +50,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -69,8 +65,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   @Mock LogWatch logWatch;
   @Mock KubernetesPeonLifecycle.TaskStateListener stateListener;
 
-  List<Pair<TaskRunnerListener, Executor>> listeners = ImmutableList.of();
-
   private ObjectMapper mapper;
   private Task task;
   private K8sTaskId k8sTaskId;
@@ -92,8 +86,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     )
     {
       @Override
@@ -138,8 +131,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     )
     {
       @Override
@@ -183,8 +175,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     )
     {
       @Override
@@ -233,8 +224,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     )
     {
       @Override
@@ -278,19 +268,15 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(null));
+    )).andReturn(new JobResponse(null, PeonPhase.FAILED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
     
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
@@ -316,15 +302,12 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   @Test
   public void test_join() throws IOException
   {
-    Executor executor = EasyMock.mock(Executor.class);
-    TaskRunnerListener taskRunnerListener = 
EasyMock.mock(TaskRunnerListener.class);
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        ImmutableList.of(Pair.of(taskRunnerListener, executor))
+        stateListener
     );
 
     Job job = new JobBuilder()
@@ -342,12 +325,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(job));
+    )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
         IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8)
     ));
@@ -359,10 +338,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    executor.execute(EasyMock.anyObject());
-    EasyMock.expectLastCall();
-    taskRunnerListener.locationChanged(EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -384,8 +359,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     Job job = new JobBuilder()
@@ -401,15 +375,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(job));
+    )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
-    // Only update the location the first time, second call doesn't reach this 
point in the logic
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
-    // Always try to delete the job
-    
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true).times(2);
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
         Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8))
     );
@@ -452,8 +419,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     Job job = new JobBuilder()
@@ -469,12 +435,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(job));
+    )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
     
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
@@ -507,8 +469,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     Job job = new JobBuilder()
@@ -524,12 +485,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(job));
+    )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
@@ -562,8 +519,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     Job job = new JobBuilder()
@@ -579,11 +535,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(k8sTaskId),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
-    )).andReturn(new JobResponse(job));
+    )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
         Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8))
     );
@@ -596,9 +549,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     logWatch.close();
     EasyMock.expectLastCall();
 
-    // We should still try to cleanup the Job after
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -619,8 +569,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
 
     EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
@@ -629,9 +578,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andThrow(new RuntimeException());
 
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
-        Optional.of(new 
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
-    );
     // We should still try to push logs
     
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
@@ -642,7 +588,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-    EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
+
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -662,8 +608,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     peonLifecycle.shutdown();
   }
@@ -676,8 +621,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.PENDING);
 
@@ -698,8 +642,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -720,8 +663,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.STOPPED);
 
@@ -736,8 +678,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.NOT_STARTED);
 
@@ -752,8 +693,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.PENDING);
 
@@ -768,8 +708,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -792,8 +731,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.STOPPED);
 
@@ -809,8 +747,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.NOT_STARTED);
 
@@ -826,8 +763,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.PENDING);
 
@@ -843,8 +779,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -866,8 +801,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -895,8 +829,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -932,8 +865,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -969,8 +901,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
@@ -1007,8 +938,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         kubernetesClient,
         taskLogs,
         mapper,
-        stateListener,
-        listeners
+        stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.STOPPED);
     
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
index 579b7539d81..1f4a7281f64 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
@@ -47,7 +47,7 @@ public class KubernetesTaskRunnerConfigTest
     Assert.assertNull(config.getGraceTerminationPeriodSeconds());
     Assert.assertTrue(config.isDisableClientProxy());
     Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout());
-    Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay());
+    Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay());
     Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval());
     Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout());
     Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors());
@@ -72,7 +72,7 @@ public class KubernetesTaskRunnerConfigTest
     Assert.assertNull(config.getGraceTerminationPeriodSeconds());
     Assert.assertTrue(config.isDisableClientProxy());
     Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout());
-    Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay());
+    Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay());
     Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval());
     Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout());
     Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors());
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index e04ef630036..36a7b4cfcd9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -28,10 +28,8 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
@@ -78,9 +76,6 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
   @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
   @Mock private ServiceEmitter emitter;
 
-  @Mock private Executor executor;
-  @Mock private TaskRunnerListener taskRunnerListener;
-
   private KubernetesTaskRunnerConfig config;
   private KubernetesTaskRunner runner;
   private Task task;
@@ -121,7 +116,11 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
       {
         return tasks.computeIfAbsent(
             task.getId(),
-            k -> new KubernetesWorkItem(task)).getResult();
+            k -> new KubernetesWorkItem(
+                task,
+                Futures.immediateFuture(TaskStatus.success(task.getId()))
+            )
+        ).getResult();
       }
     };
 
@@ -250,7 +249,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_run_whenExceptionThrown_throwsRuntimeException() throws 
Exception
+  public void test_run_whenExceptionThrown_throwsRuntimeException() throws 
IOException
   {
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -270,89 +269,11 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     replayAll();
 
     ListenableFuture<TaskStatus> future = runner.run(task);
-    TaskStatus taskStatus = future.get();
-    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
-    Assert.assertEquals("Could not start task execution", 
taskStatus.getErrorMsg());
-    verifyAll();
-  }
 
-  @Test
-  public void test_run_updateStatus() throws ExecutionException, 
InterruptedException
-  {
-    KubernetesTaskRunner runner = new KubernetesTaskRunner(
-        taskAdapter,
-        config,
-        peonClient,
-        httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
-    );
+    Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+    Assert.assertTrue(e.getCause() instanceof RuntimeException);
 
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task);
-    runner.tasks.put(task.getId(), workItem);
-    TaskStatus completeTaskStatus = TaskStatus.success(task.getId());
-
-    replayAll();
-    runner.updateStatus(task, completeTaskStatus);
     verifyAll();
-
-    assertTrue(workItem.getResult().isDone());
-    assertEquals(completeTaskStatus, workItem.getResult().get());
-  }
-
-  @Test
-  public void test_run_updateStatus_running()
-  {
-    KubernetesTaskRunner runner = new KubernetesTaskRunner(
-        taskAdapter,
-        config,
-        peonClient,
-        httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
-    );
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task);
-    runner.tasks.put(task.getId(), workItem);
-    TaskStatus runningTaskStatus = TaskStatus.running(task.getId());
-
-    replayAll();
-    runner.updateStatus(task, runningTaskStatus);
-    verifyAll();
-
-    assertFalse(workItem.getResult().isDone());
-  }
-
-  @Test
-  public void test_registerListener_runningTask()
-  {
-    KubernetesTaskRunner runner = new KubernetesTaskRunner(
-        taskAdapter,
-        config,
-        peonClient,
-        httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
-    );
-
-    KubernetesPeonLifecycle runningKubernetesPeonLifecycle = 
EasyMock.mock(KubernetesPeonLifecycle.class);
-    
EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING);
-    
EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown());
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task);
-    workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle);
-    runner.tasks.put(task.getId(), workItem);
-
-    Executor executor = EasyMock.mock(Executor.class);
-    TaskRunnerListener taskRunnerListener = 
EasyMock.mock(TaskRunnerListener.class);
-    executor.execute(EasyMock.anyObject());
-    EasyMock.expectLastCall();
-    taskRunnerListener.locationChanged(EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall();
-
-    replayAll();
-    EasyMock.replay(runningKubernetesPeonLifecycle);
-    runner.registerListener(taskRunnerListener, executor);
-    verifyAll();
-    EasyMock.verify(runningKubernetesPeonLifecycle);
   }
 
   @Test
@@ -382,15 +303,16 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_join_whenExceptionThrown_throwsRuntimeException() throws 
ExecutionException, InterruptedException
+  public void test_join_whenExceptionThrown_throwsRuntimeException()
   {
     
EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new 
IllegalStateException());
 
     replayAll();
 
     ListenableFuture<TaskStatus> future = runner.joinAsync(task);
-    TaskStatus taskStatus = future.get();
-    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+
+    Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+    Assert.assertTrue(e.getCause() instanceof RuntimeException);
 
     verifyAll();
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index f2f398658e0..7d17193b171 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -56,7 +56,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ));
 
@@ -67,7 +66,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
             null,
             null,
             null,
-            null,
             null
         ))
     );
@@ -82,8 +80,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   @Test
   public void test_shutdown_withKubernetesPeonLifecycle()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task);
-
     kubernetesPeonLifecycle.shutdown();
     EasyMock.expectLastCall();
     kubernetesPeonLifecycle.startWatchingLogs();
@@ -91,6 +87,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
 
     replayAll();
     workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
+
     workItem.shutdown();
     verifyAll();
   }
@@ -161,7 +158,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ));
 
@@ -176,7 +172,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ) {
       @Override
@@ -199,7 +194,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ) {
       @Override
@@ -222,7 +216,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ) {
       @Override
@@ -251,7 +244,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ));
     Assert.assertFalse(workItem.streamTaskLogs().isPresent());
@@ -271,7 +263,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     ));
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
index fa0f79bc1d2..8b8c43c0d71 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
@@ -20,11 +20,6 @@
 package org.apache.druid.k8s.overlord;
 
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
-
-import java.util.List;
-import java.util.concurrent.Executor;
 
 public class TestPeonLifecycleFactory implements PeonLifecycleFactory
 {
@@ -36,7 +31,7 @@ public class TestPeonLifecycleFactory implements 
PeonLifecycleFactory
   }
 
   @Override
-  public KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener, 
List<Pair<TaskRunnerListener, Executor>> listeners)
+  public KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener)
   {
     return kubernetesPeonLifecycle;
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
index cf9f345fd7a..2e2043578aa 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
@@ -39,7 +39,7 @@ class JobResponseTest
         .endStatus()
         .build();
 
-    JobResponse response = new JobResponse(job);
+    JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
 
     Assertions.assertEquals(58000L, response.getJobDuration());
   }
@@ -56,7 +56,7 @@ class JobResponseTest
         .endStatus()
         .build();
 
-    JobResponse response = new JobResponse(job);
+    JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
 
     Assertions.assertEquals(-1, response.getJobDuration());
   }
@@ -70,7 +70,7 @@ class JobResponseTest
         .endMetadata()
         .build();
 
-    JobResponse response = new JobResponse(job);
+    JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
 
     Assertions.assertEquals(-1, response.getJobDuration());
   }
@@ -78,7 +78,7 @@ class JobResponseTest
   @Test
   void testNullJob()
   {
-    JobResponse response = new JobResponse(null);
+    JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED);
     long duration = response.getJobDuration();
     Assertions.assertEquals(-1, duration);
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index cde7faa473b..f6096b675d6 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -153,6 +153,7 @@ public class KubernetesPeonClientTest
         TimeUnit.SECONDS
     );
 
+    Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase());
     Assertions.assertNotNull(jobResponse.getJob());
   }
 
@@ -177,6 +178,7 @@ public class KubernetesPeonClientTest
         TimeUnit.SECONDS
     );
 
+    Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
     Assertions.assertNotNull(jobResponse.getJob());
   }
 
@@ -189,6 +191,7 @@ public class KubernetesPeonClientTest
         TimeUnit.SECONDS
     );
 
+    Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
     Assertions.assertNull(jobResponse.getJob());
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
similarity index 53%
copy from 
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
copy to 
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
index 2b180fb9dac..3f6bd71312b 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
@@ -17,16 +17,27 @@
  * under the License.
  */
 
-package org.apache.druid.k8s.overlord;
+package org.apache.druid.k8s.overlord.common;
 
-import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import org.junit.jupiter.api.Test;
 
-import java.util.List;
-import java.util.concurrent.Executor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public interface PeonLifecycleFactory
+public class PeonPhaseTest
 {
-  KubernetesPeonLifecycle build(Task task, 
KubernetesPeonLifecycle.TaskStateListener stateListener, 
List<Pair<TaskRunnerListener, Executor>> listeners);
+
+  @Test
+  void testGetPhaseForToMakeCoverageHappy()
+  {
+    Pod pod = mock(Pod.class);
+    PodStatus status = mock(PodStatus.class);
+    when(status.getPhase()).thenReturn("Succeeded");
+    when(pod.getStatus()).thenReturn(status);
+    assertEquals(PeonPhase.UNKNOWN, PeonPhase.getPhaseFor(null));
+    assertEquals(PeonPhase.SUCCEEDED, PeonPhase.getPhaseFor(pod));
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 2cc5bf15c65..09816168588 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -38,11 +38,13 @@ import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.common.JobResponse;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
+import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.junit.jupiter.api.BeforeEach;
@@ -182,8 +184,9 @@ public class DruidPeonClientIntegrationTest
     assertEquals(task, taskFromPod);
 
 
-    peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES);
+    JobResponse jobStatusResult = peonClient.waitForPeonJobCompletion(taskId, 
2, TimeUnit.MINUTES);
     thread.join();
+    assertEquals(PeonPhase.SUCCEEDED, jobStatusResult.getPhase());
     // as long as there were no exceptions we are good!
     assertEquals(expectedLogs, actualLogs);
     // cleanup my job
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
index 741a032eb6c..30960cdbc66 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
@@ -62,4 +62,4 @@ spec:
               ephemeral-storage: 1Gi
       hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
       restartPolicy: "Never"
-  ttlSecondsAfterFinished: 3600
\ No newline at end of file
+  ttlSecondsAfterFinished: 172800
\ No newline at end of file
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
index 73f31ddfb50..70b8b7c1d24 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
@@ -105,4 +105,4 @@ spec:
           name: "graveyard"
         - emptyDir: {}
           name: "kubexit"
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
index 73f31ddfb50..70b8b7c1d24 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
@@ -105,4 +105,4 @@ spec:
           name: "graveyard"
         - emptyDir: {}
           name: "kubexit"
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index 004fed9585a..2cef837f397 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -16,7 +16,7 @@ metadata:
 spec:
   activeDeadlineSeconds: 14400
   backoffLimit: 0
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
   template:
     metadata:
       labels:
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index b6ca8a2cefe..cf16c49c5db 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -16,7 +16,7 @@ metadata:
 spec:
   activeDeadlineSeconds: 14400
   backoffLimit: 0
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
   template:
     metadata:
       labels:
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 8ecdaf50b01..d72d0ef37b0 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -16,7 +16,7 @@ metadata:
 spec:
   activeDeadlineSeconds: 14400
   backoffLimit: 0
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
   template:
     metadata:
       labels:
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
index 547887e9084..a230ac913a6 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -16,7 +16,7 @@ metadata:
 spec:
   activeDeadlineSeconds: 14400
   backoffLimit: 0
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
   template:
     metadata:
       labels:
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
index ecd9416c563..e46de133788 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
@@ -104,4 +104,4 @@ spec:
           name: "graveyard"
         - emptyDir: {}
           name: "kubexit"
-  ttlSecondsAfterFinished: 3600
+  ttlSecondsAfterFinished: 172800
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
index 7afc393c56a..f270368fb55 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
@@ -57,4 +57,4 @@ spec:
               cpu: "1000m"
               memory: "2400000000"
       restartPolicy: "Never"
-  ttlSecondsAfterFinished: 3600
\ No newline at end of file
+  ttlSecondsAfterFinished: 172800
\ No newline at end of file
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
index 088169d3a53..f4926864dcb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
@@ -23,15 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Optional;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
 
-/* This class was added for mm-less ingestion in order to let the peon manage 
its own location lifecycle by submitting
-actions to the overlord. https://github.com/apache/druid/pull/15133 moved this 
location logic to the overlord itself
-so this Action is no longer needed. For backwards compatibility with old 
peons, this class was left in but can be deprecated
-for a later druid release.
-*/
-@Deprecated
 public class UpdateLocationAction implements TaskAction<Void>
 {
   @JsonIgnore
@@ -62,6 +58,10 @@ public class UpdateLocationAction implements TaskAction<Void>
   @Override
   public Void perform(Task task, TaskActionToolbox toolbox)
   {
+    Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
+    if (taskRunner.isPresent()) {
+      taskRunner.get().updateLocation(task, taskLocation);
+    }
     return null;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index cd17160f3aa..e8770c512dc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -25,11 +25,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.UpdateLocationAction;
 import org.apache.druid.indexing.common.actions.UpdateStatusAction;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
@@ -40,12 +42,14 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.segment.indexing.BatchIOConfig;
+import org.apache.druid.server.DruidNode;
 import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.HashMap;
@@ -151,6 +155,11 @@ public abstract class AbstractTask implements Task
       FileUtils.mkdirp(attemptDir);
       reportsFile = new File(attemptDir, "report.json");
       statusFile = new File(attemptDir, "status.json");
+      InetAddress hostName = InetAddress.getLocalHost();
+      DruidNode node = toolbox.getTaskExecutorNode();
+      toolbox.getTaskActionClient().submit(new 
UpdateLocationAction(TaskLocation.create(
+          hostName.getHostAddress(), node.getPlaintextPort(), 
node.getTlsPort(), node.isEnablePlaintextPort()
+      )));
     }
     log.debug("Task setup complete");
     return null;
@@ -203,6 +212,7 @@ public abstract class AbstractTask implements Task
     // report back to the overlord
     UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport);
     toolbox.getTaskActionClient().submit(status);
+    toolbox.getTaskActionClient().submit(new 
UpdateLocationAction(TaskLocation.unknown()));
 
     if (reportsFile != null && reportsFile.exists()) {
       toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 99b0c05b832..ac1fd124ef5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -141,6 +141,11 @@ public interface TaskRunner
     // do nothing
   }
 
+  default void updateLocation(Task task, TaskLocation location)
+  {
+    // do nothing
+  }
+
   /**
    * The maximum number of tasks this TaskRunner can run concurrently.
    * Can return -1 if this method is not implemented or capacity can't be 
found.
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
new file mode 100644
index 00000000000..83aeb382dc0
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.common.base.Optional;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class UpdateLocationActionTest
+{
+  @Test
+  public void testFlow() throws UnknownHostException
+  {
+    // get my task location
+    InetAddress hostName = InetAddress.getLocalHost();
+    TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 
1, 2);
+    UpdateLocationAction action = new UpdateLocationAction(myLocation);
+    Task task = NoopTask.create();
+    TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+    TaskRunner runner = mock(TaskRunner.class);
+    when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
+    action.perform(task, toolbox);
+    verify(runner, times(1)).updateLocation(eq(task), eq(myLocation));
+  }
+
+  @Test
+  public void testWithNoTaskRunner() throws UnknownHostException
+  {
+    // get my task location
+    InetAddress hostName = InetAddress.getLocalHost();
+    TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 
1, 2);
+    UpdateLocationAction action = new UpdateLocationAction(myLocation);
+    Task task = NoopTask.create();
+    TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+    TaskRunner runner = mock(TaskRunner.class);
+    when(toolbox.getTaskRunner()).thenReturn(Optional.absent());
+    action.perform(task, toolbox);
+    verify(runner, never()).updateStatus(any(), any());
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index bcd2f086fd0..ba210fee228 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -109,7 +109,7 @@ public class AbstractTaskTest
     task.run(toolbox);
 
     // call it 3 times, once to update location in setup, then one for status 
and location in cleanup
-    Mockito.verify(taskActionClient, times(1)).submit(any());
+    Mockito.verify(taskActionClient, times(3)).submit(any());
     verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
     verify(pusher, times(1)).pushTaskStatus(eq("myID"), any());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to