This is an automated email from the ASF dual-hosted git repository.

davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d81d13b9ba Pod template task adapter (#13896)
d81d13b9ba is described below

commit d81d13b9ba5b81ed098230dd5fbf24f39ea5710f
Author: Nicholas Lippis <[email protected]>
AuthorDate: Wed Mar 22 16:20:24 2023 -0400

    Pod template task adapter (#13896)
    
    * Pod template task adapter
    
    * Use getBaseTaskDirPaths
    
    * Remove unused task from getEnv
    
    * Use Optional.ifPresent() instead of Optional.map()
    
    * Pass absolute path
    
    * Don't pass task to getEnv
    
    * Assert the correct adapter is created
    
    * Javadocs and Comments
    
    * Add exception message to assertions
---
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  88 +----
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  23 +-
 .../k8s/overlord/common/DruidK8sConstants.java     |   4 +
 .../druid/k8s/overlord/common/K8sTaskAdapter.java  | 100 +++++-
 .../overlord/common/MultiContainerTaskAdapter.java |  14 +-
 .../overlord/common/PodTemplateTaskAdapter.java    | 259 ++++++++++++++
 .../common/SingleContainerTaskAdapter.java         |  12 +-
 .../druid/k8s/overlord/common/TaskAdapter.java     |   8 +-
 .../overlord/KubernetesTaskRunnerFactoryTest.java  | 146 ++++++++
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |  75 +---
 .../common/DruidPeonClientIntegrationTest.java     |  55 ++-
 .../k8s/overlord/common/K8sTaskAdapterTest.java    |  54 ++-
 .../common/MultiContainerTaskAdapterTest.java      |  60 +++-
 .../common/PodTemplateTaskAdapterTest.java         | 395 +++++++++++++++++++++
 .../common/SingleContainerTaskAdapterTest.java     |  51 ++-
 .../src/test/resources/basePod.yaml                |  23 ++
 .../src/test/resources/basePodTemplate.yaml        |  12 +
 .../test/resources/basePodWithoutAnnotations.yaml  |  21 ++
 .../src/test/resources/expectedNoopJob.yaml        |  40 +++
 .../test/resources/expectedNoopJobTlsEnabled.yaml  |  40 +++
 20 files changed, 1291 insertions(+), 189 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 44723db377..ae2dbbb885 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -35,38 +34,29 @@ import org.apache.commons.io.FileUtils;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.ForkingTaskRunner;
-import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerUtils;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
-import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
 import org.apache.druid.k8s.overlord.common.JobResponse;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
-import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.k8s.overlord.common.TaskAdapter;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -101,36 +91,28 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   private final ScheduledExecutorService cleanupExecutor;
 
   protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new 
ConcurrentHashMap<>();
-  private final StartupLoggingConfig startupLoggingConfig;
-  private final TaskAdapter<Pod, Job> adapter;
+  protected final TaskAdapter adapter;
 
   private final KubernetesTaskRunnerConfig k8sConfig;
   private final TaskQueueConfig taskQueueConfig;
   private final TaskLogPusher taskLogPusher;
   private final ListeningExecutorService exec;
   private final KubernetesPeonClient client;
-  private final DruidNode node;
-  private final TaskConfig taskConfig;
 
 
   public KubernetesTaskRunner(
-      StartupLoggingConfig startupLoggingConfig,
-      TaskAdapter<Pod, Job> adapter,
+      TaskAdapter adapter,
       KubernetesTaskRunnerConfig k8sConfig,
       TaskQueueConfig taskQueueConfig,
       TaskLogPusher taskLogPusher,
-      KubernetesPeonClient client,
-      DruidNode node,
-      TaskConfig taskConfig
+      KubernetesPeonClient client
   )
   {
-    this.startupLoggingConfig = startupLoggingConfig;
     this.adapter = adapter;
     this.k8sConfig = k8sConfig;
     this.taskQueueConfig = taskQueueConfig;
     this.taskLogPusher = taskLogPusher;
     this.client = client;
-    this.node = node;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
@@ -139,7 +121,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
         "The task queue bounds how many concurrent k8s tasks you can have"
     );
-    this.taskConfig = taskConfig;
   }
 
 
@@ -163,13 +144,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
                   JobResponse completedPhase;
                   Optional<Job> existingJob = client.jobExists(k8sTaskId);
                   if (!existingJob.isPresent()) {
-                    PeonCommandContext context = new PeonCommandContext(
-                        generateCommand(task),
-                        javaOpts(task),
-                        new File(taskConfig.getBaseTaskDirPaths().get(0)),
-                        node.isEnableTlsPort()
-                    );
-                    Job job = adapter.fromTask(task, context);
+                    Job job = adapter.fromTask(task);
                     log.info("Job created %s and ready to launch", k8sTaskId);
                     Pod peonPod = client.launchJobAndWaitForStart(
                         job,
@@ -313,61 +288,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     return ImmutableMap.of("taskQueue", (long) taskQueueConfig.getMaxSize());
   }
 
-  private List<String> javaOpts(Task task)
-  {
-    final List<String> javaOpts = new ArrayList<>();
-    Iterables.addAll(javaOpts, k8sConfig.javaOptsArray);
-
-    // Override task specific javaOpts
-    Object taskJavaOpts = task.getContextValue(
-        ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
-    );
-    if (taskJavaOpts != null) {
-      Iterables.addAll(
-          javaOpts,
-          new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
-      );
-    }
-
-    javaOpts.add(StringUtils.format("-Ddruid.port=%d", 
DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.plaintextPort=%d", 
DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.tlsPort=%d", 
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
-    javaOpts.add(StringUtils.format(
-        "-Ddruid.task.executor.tlsPort=%d",
-        node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
-    ));
-    javaOpts.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", 
node.isEnableTlsPort())
-    );
-    return javaOpts;
-  }
-
-  private List<String> generateCommand(Task task)
-  {
-    final List<String> command = new ArrayList<>();
-    command.add("/peon.sh");
-    command.add(taskConfig.getBaseTaskDirPaths().get(0));
-    command.add(task.getId());
-    command.add("1"); // the attemptId is always 1, we never run the task 
twice on the same pod.
-
-    String nodeType = task.getNodeType();
-    if (nodeType != null) {
-      command.add("--nodeType");
-      command.add(nodeType);
-    }
-
-    // If the task type is queryable, we need to load broadcast segments on 
the peon, used for
-    // join queries
-    if (task.supportsQueries()) {
-      command.add("--loadBroadcastSegments");
-      command.add("true");
-    }
-    log.info(
-        "Peon Command for K8s job: %s",
-        
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), 
command)
-    );
-    return command;
-  }
-
   @Override
   public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
   {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 759752753b..d143eb6840 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -86,20 +86,31 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
 
     K8sTaskAdapter adapter;
     if (kubernetesTaskRunnerConfig.sidecarSupport) {
-      adapter = new MultiContainerTaskAdapter(client, 
kubernetesTaskRunnerConfig, smileMapper);
+      adapter = new MultiContainerTaskAdapter(
+          client,
+          kubernetesTaskRunnerConfig,
+          taskConfig,
+          startupLoggingConfig,
+          druidNode,
+          smileMapper
+      );
     } else {
-      adapter = new SingleContainerTaskAdapter(client, 
kubernetesTaskRunnerConfig, smileMapper);
+      adapter = new SingleContainerTaskAdapter(
+          client,
+          kubernetesTaskRunnerConfig,
+          taskConfig,
+          startupLoggingConfig,
+          druidNode,
+          smileMapper
+      );
     }
 
     runner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        new DruidKubernetesPeonClient(client, 
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
-        druidNode,
-        taskConfig
+        new DruidKubernetesPeonClient(client, 
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs)
     );
     return runner;
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index abdb74ee0c..a43c9b908f 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -23,7 +23,11 @@ import com.google.common.base.Predicate;
 
 public class DruidK8sConstants
 {
+  public static final String TASK = "task";
   public static final String TASK_ID = "task.id";
+  public static final String TASK_TYPE = "task.type";
+  public static final String TASK_GROUP_ID = "task.group.id";
+  public static final String TASK_DATASOURCE = "task.datasource";
   public static final int PORT = 8100;
   public static final int TLS_PORT = 8091;
   public static final String TLS_ENABLED = "tls.enabled";
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
index b067211399..bfc87d7c86 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
@@ -40,12 +40,20 @@ import 
io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ForkingTaskRunner;
+import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
+import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -63,33 +71,48 @@ import java.util.Optional;
  * to add some extra coordination to shut down sidecar containers when the 
main pod exits.
  */
 
-public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
+public abstract class K8sTaskAdapter implements TaskAdapter
 {
 
   private static final EmittingLogger log = new 
EmittingLogger(K8sTaskAdapter.class);
 
   protected final KubernetesClientApi client;
-  protected final KubernetesTaskRunnerConfig config;
+  protected final KubernetesTaskRunnerConfig taskRunnerConfig;
+  protected final TaskConfig taskConfig;
+  protected final StartupLoggingConfig startupLoggingConfig;
+  protected final DruidNode node;
   protected final ObjectMapper mapper;
 
   public K8sTaskAdapter(
       KubernetesClientApi client,
-      KubernetesTaskRunnerConfig config,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      StartupLoggingConfig startupLoggingConfig,
+      DruidNode node,
       ObjectMapper mapper
   )
   {
     this.client = client;
-    this.config = config;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.startupLoggingConfig = startupLoggingConfig;
+    this.node = node;
     this.mapper = mapper;
   }
 
   @Override
-  public Job fromTask(Task task, PeonCommandContext context) throws IOException
+  public Job fromTask(Task task) throws IOException
   {
     String myPodName = System.getenv("HOSTNAME");
-    Pod pod = client.executeRequest(client -> 
client.pods().inNamespace(config.namespace).withName(myPodName).get());
+    Pod pod = client.executeRequest(client -> 
client.pods().inNamespace(taskRunnerConfig.namespace).withName(myPodName).get());
+    PeonCommandContext context = new PeonCommandContext(
+        generateCommand(task),
+        javaOpts(task),
+        new File(taskConfig.getBaseTaskDirPaths().get(0)),
+        node.isEnableTlsPort()
+    );
     PodSpec podSpec = pod.getSpec();
-    massageSpec(podSpec, config.primaryContainerName);
+    massageSpec(podSpec, taskRunnerConfig.primaryContainerName);
     return createJobFromPodSpec(podSpec, task, context);
   }
 
@@ -125,9 +148,9 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter<Pod, Job>
         .endMetadata()
         .withNewSpec()
         .withTemplate(podTemplate)
-        
.withActiveDeadlineSeconds(config.maxTaskDuration.toStandardDuration().getStandardSeconds())
+        
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
         .withBackoffLimit(0)
-        .withTtlSecondsAfterFinished((int) 
config.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+        .withTtlSecondsAfterFinished((int) 
taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
         .endSpec()
         .build();
   }
@@ -245,7 +268,7 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter<Pod, Job>
 
   protected Map<String, String> addJobSpecificAnnotations(PeonCommandContext 
context, K8sTaskId k8sTaskId)
   {
-    Map<String, String> annotations = config.annotations;
+    Map<String, String> annotations = taskRunnerConfig.annotations;
     annotations.put(DruidK8sConstants.TASK_ID, k8sTaskId.getOriginalTaskId());
     annotations.put(DruidK8sConstants.TLS_ENABLED, 
String.valueOf(context.isEnableTls()));
     return annotations;
@@ -253,7 +276,7 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter<Pod, Job>
 
   protected Map<String, String> addJobSpecificLabels()
   {
-    Map<String, String> labels = config.labels;
+    Map<String, String> labels = taskRunnerConfig.labels;
     labels.put(DruidK8sConstants.LABEL_KEY, "true");
     return labels;
   }
@@ -269,7 +292,7 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter<Pod, Job>
     podSpec.setNodeName(null);
     podSpec.setRestartPolicy("Never");
     podSpec.setHostname(k8sTaskId.getK8sTaskId());
-    
podSpec.setTerminationGracePeriodSeconds(config.graceTerminationPeriodSeconds);
+    
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.graceTerminationPeriodSeconds);
 
     PodTemplateSpec podTemplate = new PodTemplateSpec();
     ObjectMeta objectMeta = new ObjectMeta();
@@ -304,4 +327,57 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter<Pod, Job>
     }
   }
 
+  private List<String> javaOpts(Task task)
+  {
+    final List<String> javaOpts = new ArrayList<>();
+    Iterables.addAll(javaOpts, taskRunnerConfig.javaOptsArray);
+
+    // Override task specific javaOpts
+    Object taskJavaOpts = task.getContextValue(
+        ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+    );
+    if (taskJavaOpts != null) {
+      Iterables.addAll(
+          javaOpts,
+          new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
+      );
+    }
+
+    
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d",
 DruidK8sConstants.PORT));
+    
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d",
 DruidK8sConstants.PORT));
+    
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d",
 node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
+        "-Ddruid.task.executor.tlsPort=%d",
+        node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
+    ));
+    
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s",
 node.isEnableTlsPort())
+    );
+    return javaOpts;
+  }
+
+  private List<String> generateCommand(Task task)
+  {
+    final List<String> command = new ArrayList<>();
+    command.add("/peon.sh");
+    command.add(new 
File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath());
+    command.add("1"); // the attemptId is always 1, we never run the task 
twice on the same pod.
+
+    String nodeType = task.getNodeType();
+    if (nodeType != null) {
+      command.add("--nodeType");
+      command.add(nodeType);
+    }
+
+    // If the task type is queryable, we need to load broadcast segments on 
the peon, used for
+    // join queries
+    if (task.supportsQueries()) {
+      command.add("--loadBroadcastSegments");
+      command.add("true");
+    }
+    log.info(
+        "Peon Command for K8s job: %s",
+        
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), 
command)
+    );
+    return command;
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
index df890ede6d..f17dbaaa0a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
@@ -34,8 +34,11 @@ import io.fabric8.kubernetes.api.model.VolumeMount;
 import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -44,13 +47,18 @@ import java.util.Map;
 
 public class MultiContainerTaskAdapter extends K8sTaskAdapter
 {
+  public static String TYPE = "MultiContainer";
+
   public MultiContainerTaskAdapter(
       KubernetesClientApi client,
-      KubernetesTaskRunnerConfig config,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      StartupLoggingConfig startupLoggingConfig,
+      DruidNode druidNode,
       ObjectMapper mapper
   )
   {
-    super(client, config, mapper);
+    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper);
   }
 
   @Override
@@ -87,7 +95,7 @@ public class MultiContainerTaskAdapter extends K8sTaskAdapter
   {
     return new ContainerBuilder()
         .withName("kubexit")
-        .withImage(config.kubexitImage)
+        .withImage(taskRunnerConfig.kubexitImage)
         .withCommand("cp", "/bin/kubexit", "/kubexit/kubexit")
         .withVolumeMounts(new 
VolumeMountBuilder().withMountPath("/kubexit").withName("kubexit").build())
         .build();
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
new file mode 100644
index 0000000000..0c8b8108de
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A PodTemplate {@link TaskAdapter} to transform tasks to kubernetes jobs and 
kubernetes pods to tasks
+ *
+ * Pod Templates
+ * This TaskAdapter allows the user to provide a pod template per druid task.  
If a pod template has
+ * not been provided for a task, then the provided base template will be used.
+ *
+ * Providing Pod Templates per Task
+ * Pod templates are provided as files, each pod template file path must be 
specified as a runtime property
+ * druid.indexer.runner.k8s.podTemplate.{task_name}=/path/to/podTemplate.yaml.
+ *
+ * Note that the base pod template must be specified as the runtime property
+ * druid.indexer.runner.k8s.podTemplate.base=/path/to/podTemplate.yaml
+ */
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + 
".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  /**
+   * Create a {@link Job} from a {@link Task}
+   *
+   * 1. Select pod template based on task type
+   * 2. Add labels and annotations to the pod template including the task as a 
compressed and base64 encoded string
+   * 3. Add labels and annotations to the job
+   * 4. Add user specified active deadline seconds and job ttl
+   * 5. Set backoff limit to zero since druid does not support external 
systems retrying failed tasks
+   *
+   * @param task
+   * @return {@link Job}
+   * @throws IOException
+   */
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), 
templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", 
task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())
+        .editTemplate()
+        .editOrNewMetadata()
+        .addToAnnotations(getPodTemplateAnnotations(task))
+        .addToLabels(getPodLabels(taskRunnerConfig))
+        .endMetadata()
+        .editSpec()
+        .editFirstContainer()
+        .addAllToEnv(getEnv())
+        .endContainer()
+        .endSpec()
+        .endTemplate()
+        
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
+        .withBackoffLimit(0)  // druid does not support an external system 
retrying failed tasks
+        .withTtlSecondsAfterFinished((int) 
taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+        .endSpec()
+        .build();
+  }
+
+  /**
+   * Transform a {@link Pod} to a {@link Task}
+   *
+   * 1. Find task annotation on the pod
+   * 2. Base 64 decode and decompress task, read into {@link Task}
+   *
+   * @param from
+   * @return {@link Task}
+   * @throws IOException
+   */
+  @Override
+  public Task toTask(Pod from) throws IOException
+  {
+    Map<String, String> annotations = from.getMetadata().getAnnotations();
+    if (annotations == null) {
+      throw new IOE("No annotations found on pod [%s]", 
from.getMetadata().getName());
+    }
+    String task = annotations.get(DruidK8sConstants.TASK);
+    if (task == null) {
+      throw new IOE("No task annotation found on pod [%s]", 
from.getMetadata().getName());
+    }
+    return mapper.readValue(Base64Compression.decompressBase64(task), 
Task.class);
+  }
+
+  private HashMap<String, PodTemplate> initializePodTemplates(Properties 
properties)
+  {
+    HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
+    Optional<PodTemplate> basePodTemplate = loadPodTemplate("base", 
properties);
+    if (!basePodTemplate.isPresent()) {
+      throw new IAE("Pod template task adapter requires a base pod template to 
be specified");
+    }
+    podTemplateMap.put("base", basePodTemplate.get());
+
+    MapperConfig config = mapper.getDeserializationConfig();
+    AnnotatedClass cls = 
AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class);
+    Collection<NamedType> taskSubtypes = 
mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls);
+    for (NamedType namedType : taskSubtypes) {
+      String taskType = namedType.getName();
+      Optional<PodTemplate> template = loadPodTemplate(taskType, properties);
+      template.ifPresent(podTemplate -> podTemplateMap.put(taskType, 
podTemplate));
+    }
+    return podTemplateMap;
+  }
+
+  private Optional<PodTemplate> loadPodTemplate(String key, Properties 
properties)
+  {
+    String property = StringUtils.format(TASK_PROPERTY, key);
+    String podTemplateFile = properties.getProperty(property);
+    if (podTemplateFile == null) {
+      log.debug("Pod template file not specified for [%s]", key);
+      return Optional.empty();
+    }
+    try {
+      return Optional.of(client.executeRequest(client -> 
client.v1().podTemplates().load(new File(podTemplateFile)).get()));
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", 
property, podTemplateFile);
+    }
+  }
+
+  private Collection<EnvVar> getEnv()
+  {
+    return ImmutableList.of(
+        new EnvVarBuilder()
+            .withName(DruidK8sConstants.TASK_DIR_ENV)
+            .withValue(new 
File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath())
+            .build(),
+        new EnvVarBuilder()
+            .withName(DruidK8sConstants.TASK_JSON_ENV)
+            .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new 
ObjectFieldSelector(
+                null,
+                StringUtils.format("metadata.annotations['%s']", 
DruidK8sConstants.TASK)
+            )).build()).build()
+    );
+  }
+
+  private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig config)
+  {
+    return getJobLabels(config);
+  }
+
+  private Map<String, String> getPodTemplateAnnotations(Task task) throws 
IOException
+  {
+    return ImmutableMap.<String, String>builder()
+        .put(DruidK8sConstants.TASK, 
Base64Compression.compressBase64(mapper.writeValueAsString(task)))
+        .put(DruidK8sConstants.TLS_ENABLED, 
String.valueOf(node.isEnableTlsPort()))
+        .put(DruidK8sConstants.TASK_ID, task.getId())
+        .put(DruidK8sConstants.TASK_TYPE, task.getType())
+        .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
+        .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
+        .build();
+  }
+
+  private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config)
+  {
+    return ImmutableMap.<String, String>builder()
+        .putAll(config.labels)
+        .put(DruidK8sConstants.LABEL_KEY, "true")
+        .build();
+  }
+
+  private Map<String, String> getJobAnnotations(KubernetesTaskRunnerConfig 
config, Task task)
+  {
+    return ImmutableMap.<String, String>builder()
+        .putAll(config.annotations)
+        .put(DruidK8sConstants.TASK_ID, task.getId())
+        .put(DruidK8sConstants.TASK_TYPE, task.getType())
+        .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
+        .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
+        .build();
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
index c6e9d049f6..d4da2987b5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
@@ -23,8 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -32,13 +35,18 @@ import java.util.Map;
 
 public class SingleContainerTaskAdapter extends K8sTaskAdapter
 {
+  public static String TYPE = "SingleContainer";
+
   public SingleContainerTaskAdapter(
       KubernetesClientApi client,
-      KubernetesTaskRunnerConfig config,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      StartupLoggingConfig startupLoggingConfig,
+      DruidNode druidNode,
       ObjectMapper mapper
   )
   {
-    super(client, config, mapper);
+    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper);
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
index 5033f4af12..00263fdc3b 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;
+  Job fromTask(Task task) throws IOException;
 
-  Task toTask(K from) throws IOException;
+  Task toTask(Pod from) throws IOException;
 
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
new file mode 100644
index 0000000000..4c3a45706d
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerFactoryTest
+{
+  private ObjectMapper objectMapper;
+  private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskQueueConfig taskQueueConfig;
+  private TaskLogPusher taskLogPusher;
+  private DruidNode druidNode;
+  private TaskConfig taskConfig;
+
+  @Before
+  public void setup()
+  {
+    objectMapper = new TestUtils().getTestObjectMapper();
+    kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskQueueConfig = new TaskQueueConfig(
+        1,
+        null,
+        null,
+        null
+    );
+    taskLogPusher = new NoopTaskLogs();
+    druidNode = new DruidNode(
+        "test",
+        "",
+        false,
+        0,
+        1,
+        true,
+        false
+    );
+    taskConfig = new TaskConfig(
+        "/tmp",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("/tmp")
+    );
+  }
+
+  @Test
+  public void test_get_returnsSameKuberentesTaskRunner_asBuild()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner expectedRunner = factory.build();
+    KubernetesTaskRunner actualRunner = factory.get();
+
+    Assert.assertEquals(expectedRunner, actualRunner);
+  }
+
+  @Test
+  public void 
test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+
+    Assert.assertNotNull(runner);
+    Assert.assertTrue(runner.adapter instanceof SingleContainerTaskAdapter);
+  }
+
+  @Test
+  public void 
test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
+  {
+    kubernetesTaskRunnerConfig.sidecarSupport = true;
+
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+
+    Assert.assertNotNull(runner);
+    Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index a4d8ae6360..6914f79d13 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -53,7 +52,6 @@ import org.apache.druid.k8s.overlord.common.JobResponse;
 import org.apache.druid.k8s.overlord.common.K8sTaskAdapter;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
-import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
@@ -83,16 +81,15 @@ import static org.mockito.Mockito.when;
 
 public class KubernetesTaskRunnerTest
 {
-
   private TaskQueueConfig taskQueueConfig;
   private StartupLoggingConfig startupLoggingConfig;
   private ObjectMapper jsonMapper;
   private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
-  private TaskConfig taskConfig;
   private TaskLogPusher taskLogPusher;
-  private DruidNode node;
+  private DruidNode druidNode;
 
-  public KubernetesTaskRunnerTest()
+  @Before
+  public void setUp()
   {
     TestUtils utils = new TestUtils();
     jsonMapper = utils.getTestObjectMapper();
@@ -103,36 +100,14 @@ public class KubernetesTaskRunnerTest
         new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
-  }
-
-  @Before
-  public void setUp()
-  {
-    taskConfig = new TaskConfig(
-        "src/test/resources",
-        "src/test/resources",
-        null,
-        null,
-        null,
-        false,
-        null,
-        null,
-        null,
-        false,
-        false,
-        null,
-        null,
-        false,
-        null
-    );
     kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
     kubernetesTaskRunnerConfig.namespace = "test";
     kubernetesTaskRunnerConfig.javaOptsArray = 
Collections.singletonList("-Xmx2g");
     taskQueueConfig = new TaskQueueConfig(1, Period.millis(1), 
Period.millis(1), Period.millis(1));
     startupLoggingConfig = new StartupLoggingConfig();
     taskLogPusher = mock(TaskLogPusher.class);
-    node = mock(DruidNode.class);
-    when(node.isEnableTlsPort()).thenReturn(false);
+    druidNode = mock(DruidNode.class);
+    when(druidNode.isEnableTlsPort()).thenReturn(false);
   }
 
   @Test
@@ -157,7 +132,7 @@ public class KubernetesTaskRunnerTest
     when(peonPod.getStatus()).thenReturn(podStatus);
 
     K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
-    when(adapter.fromTask(eq(task), any())).thenReturn(job);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
 
     DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
 
@@ -171,14 +146,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        peonClient,
-        node,
-        taskConfig
+        peonClient
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
 
@@ -213,7 +185,7 @@ public class KubernetesTaskRunnerTest
     when(peonPod.getStatus()).thenReturn(podStatus);
 
     K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
-    when(adapter.fromTask(eq(task), 
isA(PeonCommandContext.class))).thenReturn(job);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
 
     DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
 
@@ -228,14 +200,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        peonClient,
-        node,
-        taskConfig
+        peonClient
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
 
@@ -249,7 +218,7 @@ public class KubernetesTaskRunnerTest
         peonPod.getStatus().getPodIP(),
         DruidK8sConstants.PORT,
         DruidK8sConstants.TLS_PORT,
-        node.isEnableTlsPort()
+        druidNode.isEnableTlsPort()
     );
     verify(spyRunner, times(1)).updateStatus(eq(task), 
eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
   }
@@ -279,7 +248,7 @@ public class KubernetesTaskRunnerTest
     when(peonPod.getStatus()).thenReturn(podStatus);
 
     K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
-    when(adapter.fromTask(eq(task), 
isA(PeonCommandContext.class))).thenReturn(job);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
     when(adapter.toTask(eq(peonPod))).thenReturn(task);
 
     DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
@@ -296,14 +265,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        peonClient,
-        node,
-        taskConfig
+        peonClient
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
     Collection<? extends TaskRunnerWorkItem> workItems = 
spyRunner.getKnownTasks();
@@ -344,7 +310,7 @@ public class KubernetesTaskRunnerTest
     when(peonPod.getStatus()).thenReturn(podStatus);
 
     K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
-    when(adapter.fromTask(eq(task), 
isA(PeonCommandContext.class))).thenReturn(job);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
     when(adapter.toTask(eq(peonPod))).thenReturn(task);
 
     DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
@@ -361,14 +327,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        peonClient,
-        node,
-        taskConfig
+        peonClient
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
     Collection<? extends TaskRunnerWorkItem> workItems = 
spyRunner.getKnownTasks();
@@ -404,14 +367,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        startupLoggingConfig,
         mock(K8sTaskAdapter.class),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        peonClient,
-        node,
-        taskConfig
+        peonClient
     );
 
     RunnerTaskState state = taskRunner.getRunnerTaskState("foo");
@@ -434,14 +394,11 @@ public class KubernetesTaskRunnerTest
         Period.millis(1)
     );
     assertThrows(IllegalArgumentException.class, () -> new 
KubernetesTaskRunner(
-        startupLoggingConfig,
         mock(K8sTaskAdapter.class),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
         taskLogPusher,
-        mock(DruidKubernetesPeonClient.class),
-        node,
-        taskConfig
+        mock(DruidKubernetesPeonClient.class)
     ));
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
index 72acaf42b1..72cd8d41ec 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -29,10 +30,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -56,11 +61,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 // must have a kind / minikube cluster installed and the image pushed to your 
repository
 public class DruidPeonClientIntegrationTest
 {
-  private final KubernetesClientApi k8sClient;
-  private final DruidKubernetesPeonClient peonClient;
-  private final ObjectMapper jsonMapper;
-
-  public DruidPeonClientIntegrationTest()
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskConfig taskConfig;
+  private DruidNode druidNode;
+  private KubernetesClientApi k8sClient;
+  private DruidKubernetesPeonClient peonClient;
+  private ObjectMapper jsonMapper;
+
+  @BeforeEach
+  public void setup()
   {
     TestUtils utils = new TestUtils();
     jsonMapper = utils.getTestObjectMapper();
@@ -73,6 +82,33 @@ public class DruidPeonClientIntegrationTest
     );
     k8sClient = new DruidKubernetesClient();
     peonClient = new DruidKubernetesPeonClient(k8sClient, "default", false);
+    druidNode = new DruidNode(
+        "test",
+        null,
+        false,
+        null,
+        null,
+        true,
+        false
+    );
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskConfig = new TaskConfig(
+        "src/test/resources",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("src/test/resources")
+    );
   }
 
   @Disabled
@@ -84,7 +120,14 @@ public class DruidPeonClientIntegrationTest
     Task task = K8sTestUtils.getTask();
     KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
     config.namespace = "default";
-    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(k8sClient, config, 
jsonMapper);
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        k8sClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        druidNode,
+        jsonMapper
+    );
     String taskBasePath = "/home/taskDir";
     PeonCommandContext context = new 
PeonCommandContext(Collections.singletonList(
         "sleep 10;  for i in `seq 1 1000`; do echo $i; done; exit 0"
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
index 59ef5b430c..fe9775868e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import io.fabric8.kubernetes.api.model.Container;
@@ -36,11 +37,14 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.apache.commons.lang.StringUtils;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -58,8 +62,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @EnableKubernetesMockClient(crud = true)
 class K8sTaskAdapterTest
 {
-  KubernetesClient client;
+  private KubernetesClient client;
 
+  private final StartupLoggingConfig startupLoggingConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
   private ObjectMapper jsonMapper;
 
   public K8sTaskAdapterTest()
@@ -73,6 +80,33 @@ class K8sTaskAdapterTest
         new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
+    node = new DruidNode(
+        "test",
+        null,
+        false,
+        null,
+        null,
+        true,
+        false
+    );
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskConfig = new TaskConfig(
+        "src/test/resources",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("src/test/resources")
+    );
   }
 
   @Test
@@ -83,7 +117,14 @@ class K8sTaskAdapterTest
     config.namespace = "test";
     config.annotations.put("annotation_key", "annotation_value");
     config.labels.put("label_key", "label_value");
-    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, 
config, jsonMapper);
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper
+    );
     Task task = K8sTestUtils.getTask();
     Job jobFromSpec = adapter.createJobFromPodSpec(
         K8sTestUtils.getDummyPodSpec(),
@@ -105,7 +146,14 @@ class K8sTaskAdapterTest
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
     config.namespace = "test";
-    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, 
config, jsonMapper);
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper
+    );
     Task task = K8sTestUtils.getTask();
     Job jobFromSpec = adapter.createJobFromPodSpec(
         K8sTestUtils.getDummyPodSpec(),
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
index f1c7b390de..cba4051873 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -29,11 +30,15 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -44,12 +49,14 @@ import java.util.Collections;
 @EnableKubernetesMockClient(crud = true)
 class MultiContainerTaskAdapterTest
 {
-
-  KubernetesClient client;
-
+  private KubernetesClient client;
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskConfig taskConfig;
+  private DruidNode druidNode;
   private ObjectMapper jsonMapper;
 
-  public MultiContainerTaskAdapterTest()
+  @BeforeEach
+  public void setup()
   {
     TestUtils utils = new TestUtils();
     jsonMapper = utils.getTestObjectMapper();
@@ -60,6 +67,33 @@ class MultiContainerTaskAdapterTest
         new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
+    druidNode = new DruidNode(
+        "test",
+        null,
+        false,
+        null,
+        null,
+        true,
+        false
+    );
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskConfig = new TaskConfig(
+        "src/test/resources",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("src/test/resources")
+    );
   }
 
   @Test
@@ -69,7 +103,14 @@ class MultiContainerTaskAdapterTest
     Pod pod = 
client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get();
     KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
     config.namespace = "test";
-    MultiContainerTaskAdapter adapter = new 
MultiContainerTaskAdapter(testClient, config, jsonMapper);
+    MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        druidNode,
+        jsonMapper
+    );
     NoopTask task = NoopTask.create("id", 1);
     Job actual = adapter.createJobFromPodSpec(
         pod.getSpec(),
@@ -113,7 +154,14 @@ class MultiContainerTaskAdapterTest
     KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
     config.namespace = "test";
     config.primaryContainerName = "primary";
-    MultiContainerTaskAdapter adapter = new 
MultiContainerTaskAdapter(testClient, config, jsonMapper);
+    MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        druidNode,
+        jsonMapper
+    );
     NoopTask task = NoopTask.create("id", 1);
     PodSpec spec = pod.getSpec();
     K8sTaskAdapter.massageSpec(spec, "primary");
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
new file mode 100644
index 0000000000..7c9dad4607
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Properties;
+
+@EnableKubernetesMockClient()
+public class PodTemplateTaskAdapterTest
+{
+  @TempDir private Path tempDir;
+  private KubernetesClient client;
+  private KubernetesTaskRunnerConfig taskRunnerConfig;
+  private TestKubernetesClient testClient;
+  private PodTemplate podTemplateSpec;
+  private TaskConfig taskConfig;
+  private DruidNode node;
+  private ObjectMapper mapper;
+
+  @BeforeEach
+  public void setup()
+  {
+    taskRunnerConfig = new KubernetesTaskRunnerConfig();
+    testClient = new TestKubernetesClient(client);
+    taskConfig = new TaskConfig(
+        "/tmp",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("/tmp")
+    );
+    node = new DruidNode(
+        "test",
+        "",
+        false,
+        0,
+        1,
+        true,
+        false
+    );
+    mapper = new TestUtils().getTestObjectMapper();
+    podTemplateSpec = client
+        .v1()
+        .podTemplates()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("basePodTemplate.yaml")
+        )
+        .get();
+  }
+
+  @Test
+  public void 
test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
+  {
+    Assert.assertThrows(
+        "Pod template task adapter requires a base pod template to be 
specified",
+        IAE.class,
+        () -> new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        new Properties()
+    ));
+  }
+
+  @Test
+  public void 
test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_raisesISE() 
throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("empty.yaml"));
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    Assert.assertThrows(
+        "Pod template task adapter requires a base pod template to be 
specified",
+        ISE.class,
+        () -> new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    ));
+  }
+
+  @Test
+  public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    );
+
+    Task task = new NoopTask(
+        "id",
+        "id",
+        "datasource",
+        0,
+        0,
+        null,
+        null,
+        null
+    );
+    Job actual = adapter.fromTask(task);
+    Job expected = client
+        .batch()
+        .v1()
+        .jobs()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("expectedNoopJob.yaml")
+        )
+        .get();
+
+    Assertions.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void 
test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        new DruidNode(
+            "test",
+            "",
+            false,
+            0,
+            1,
+            false,
+            true
+        ),
+        mapper,
+        props
+    );
+
+    Task task = new NoopTask(
+        "id",
+        "id",
+        "datasource",
+        0,
+        0,
+        null,
+        null,
+        null
+    );
+
+    Job actual = adapter.fromTask(task);
+    Job expected = client
+        .batch()
+        .v1()
+        .jobs()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("expectedNoopJobTlsEnabled.yaml")
+        )
+        .get();
+
+    Assertions.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void 
test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_raisesISE() 
throws IOException
+  {
+    Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    Path noopTemplatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+    mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
baseTemplatePath.toString());
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", 
noopTemplatePath.toString());
+
+    Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    ));
+  }
+
+  @Test
+  public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    );
+
+    Task task = new NoopTask(
+        "id",
+        "id",
+        "datasource",
+        0,
+        0,
+        null,
+        null,
+        null
+    );
+
+    Job actual = adapter.fromTask(task);
+    Job expected = client
+        .batch()
+        .v1()
+        .jobs()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("expectedNoopJob.yaml")
+        )
+        .get();
+
+    Assertions.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    );
+
+    Pod pod = client
+        .pods()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("basePodWithoutAnnotations.yaml")
+        )
+        .get();
+
+    Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
+  }
+
+  @Test
+  public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.put("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    );
+
+    Pod basePod = client
+        .pods()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("basePodWithoutAnnotations.yaml")
+        )
+        .get();
+
+    Pod pod = new PodBuilder(basePod)
+        .editMetadata()
+        .addToAnnotations(Collections.emptyMap())
+        .endMetadata()
+        .build();
+
+    Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
+  }
+
+  @Test
+  public void test_fromTask() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.put("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        testClient,
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props
+    );
+
+    Pod pod = client
+        .pods()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("basePod.yaml")
+        )
+        .get();
+
+    Task actual = adapter.toTask(pod);
+    Task expected = NoopTask.create("id", 1);
+
+    Assertions.assertEquals(expected, actual);
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
index c442b13f6e..9005aa086a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
@@ -22,17 +22,22 @@ package org.apache.druid.k8s.overlord.common;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -43,12 +48,14 @@ import java.util.Collections;
 @EnableKubernetesMockClient(crud = true)
 class SingleContainerTaskAdapterTest
 {
-
-  KubernetesClient client;
-
+  private KubernetesClient client;
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskConfig taskConfig;
+  private DruidNode druidNode;
   private ObjectMapper jsonMapper;
 
-  public SingleContainerTaskAdapterTest()
+  @BeforeEach
+  public void setup()
   {
     TestUtils utils = new TestUtils();
     jsonMapper = utils.getTestObjectMapper();
@@ -59,6 +66,33 @@ class SingleContainerTaskAdapterTest
         new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
+    druidNode = new DruidNode(
+        "test",
+        null,
+        false,
+        null,
+        null,
+        true,
+        false
+    );
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskConfig = new TaskConfig(
+        "src/test/resources",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("src/test/resources")
+    );
   }
 
   @Test
@@ -70,7 +104,14 @@ class SingleContainerTaskAdapterTest
                     .get();
     KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
     config.namespace = "test";
-    SingleContainerTaskAdapter adapter = new 
SingleContainerTaskAdapter(testClient, config, jsonMapper);
+    SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        druidNode,
+        jsonMapper
+    );
     NoopTask task = NoopTask.create("id", 1);
     Job actual = adapter.createJobFromPodSpec(
         pod.getSpec(),
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
new file mode 100644
index 0000000000..5c8c4f7855
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
@@ -0,0 +1,23 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  name: "id-kmwkw"
+  labels:
+    job-name: "id"
+    druid.k8s.peons: "true"
+  annotations:
+    task: 
"H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
+spec:
+  containers:
+    - command:
+        - sleep
+        - "3600"
+      env:
+        - name: "TASK_DIR"
+          value: "/tmp/id"
+        - name: "TASK_JSON"
+          valueFrom:
+            fieldRef:
+              fieldPath: "metadata.annotations['task']"
+      image: one
+      name: primary
\ No newline at end of file
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
new file mode 100644
index 0000000000..81e7d44154
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: PodTemplate
+metadata:
+  name: test
+template:
+  spec:
+    containers:
+      - command:
+          - sleep
+          - "3600"
+        image: one
+        name: primary
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
new file mode 100644
index 0000000000..c56ba7c9b6
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
@@ -0,0 +1,21 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  name: "id-kmwkw"
+  labels:
+    job-name: "id"
+  annotations:
+spec:
+  containers:
+    - command:
+        - sleep
+        - "3600"
+      env:
+        - name: "TASK_DIR"
+          value: "/tmp/id"
+        - name: "TASK_JSON"
+          valueFrom:
+            fieldRef:
+              fieldPath: "metadata.annotations['task']"
+      image: one
+      name: primary
\ No newline at end of file
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
new file mode 100644
index 0000000000..13b2e0922c
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -0,0 +1,40 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: "id"
+  labels:
+    druid.k8s.peons: "true"
+  annotations:
+    task.id: "id"
+    task.type: "noop"
+    task.group.id: "id"
+    task.datasource: "datasource"
+spec:
+  activeDeadlineSeconds: 14400
+  backoffLimit: 0
+  ttlSecondsAfterFinished: 172800
+  template:
+    metadata:
+      labels:
+        druid.k8s.peons: "true"
+      annotations:
+        task: 
"H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+        tls.enabled: "false"
+        task.id: "id"
+        task.type: "noop"
+        task.group.id: "id"
+        task.datasource: "datasource"
+    spec:
+      containers:
+        - command:
+            - sleep
+            - "3600"
+          env:
+            - name: "TASK_DIR"
+              value: "/tmp"
+            - name: "TASK_JSON"
+              valueFrom:
+                fieldRef:
+                  fieldPath: "metadata.annotations['task']"
+          image: one
+          name: primary
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
new file mode 100644
index 0000000000..1d05d6ad93
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -0,0 +1,40 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: "id"
+  labels:
+    druid.k8s.peons: "true"
+  annotations:
+    task.id: "id"
+    task.type: "noop"
+    task.group.id: "id"
+    task.datasource: "datasource"
+spec:
+  activeDeadlineSeconds: 14400
+  backoffLimit: 0
+  ttlSecondsAfterFinished: 172800
+  template:
+    metadata:
+      labels:
+        druid.k8s.peons: "true"
+      annotations:
+        task: 
"H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+        tls.enabled: "true"
+        task.id: "id"
+        task.type: "noop"
+        task.group.id: "id"
+        task.datasource: "datasource"
+    spec:
+      containers:
+        - command:
+            - sleep
+            - "3600"
+          env:
+            - name: "TASK_DIR"
+              value: "/tmp"
+            - name: "TASK_JSON"
+              valueFrom:
+                fieldRef:
+                  fieldPath: "metadata.annotations['task']"
+          image: one
+          name: primary


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to