This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 64754b6799a Allow users to pass task payload via deep storage instead 
of environment variable (#14887)
64754b6799a is described below

commit 64754b6799afcebc0943a18e3099069717fbd077
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Oct 3 04:38:59 2023 -0400

    Allow users to pass task payload via deep storage instead of environment 
variable (#14887)
    
    This change is meant to fix a issue where passing too large of a task 
payload to the mm-less task runner will cause the peon to fail to startup 
because the payload is passed (compressed) as a environment variable 
(TASK_JSON). In linux systems the limit for a environment variable is commonly 
128KB, for windows systems less than this. Setting a env variable longer than 
this results in a bunch of "Argument list too long" errors.
---
 distribution/docker/peon.sh                        |   5 +-
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  31 ++-
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |   3 +-
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  10 +-
 .../k8s/overlord/common/DruidK8sConstants.java     |   1 +
 .../k8s/overlord/taskadapter/K8sTaskAdapter.java   |  69 ++++++-
 .../taskadapter/MultiContainerTaskAdapter.java     |   6 +-
 .../taskadapter/PodTemplateTaskAdapter.java        |  87 +++++++--
 .../taskadapter/SingleContainerTaskAdapter.java    |   6 +-
 .../k8s/overlord/taskadapter/TaskAdapter.java      |   7 +
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  |  56 +++++-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |   8 +-
 .../DruidPeonClientIntegrationTest.java            |   3 +-
 .../overlord/taskadapter/K8sTaskAdapterTest.java   | 211 +++++++++++++++++++--
 .../taskadapter/MultiContainerTaskAdapterTest.java |  25 ++-
 .../taskadapter/PodTemplateTaskAdapterTest.java    | 193 +++++++++++++++++--
 .../SingleContainerTaskAdapterTest.java            |   7 +-
 .../src/test/resources/expectedNoopJob.yaml        |   4 +-
 .../src/test/resources/expectedNoopJobLongIds.yaml |   4 +-
 ...NoopJob.yaml => expectedNoopJobNoTaskJson.yaml} |   5 -
 .../test/resources/expectedNoopJobTlsEnabled.yaml  |   4 +-
 .../org/apache/druid/storage/s3/S3TaskLogs.java    |  15 ++
 .../apache/druid/storage/s3/S3TaskLogsTest.java    |  76 ++++++++
 .../druid/guice/IndexingServiceTaskLogsModule.java |   2 +
 .../apache/druid/error/InternalServerError.java    |  62 ++++++
 .../org/apache/druid/tasklogs/NoopTaskLogs.java    |  12 ++
 .../java/org/apache/druid/tasklogs/TaskLogs.java   |   3 +-
 .../apache/druid/tasklogs/TaskPayloadManager.java  |  57 ++++++
 .../druid/error/InternalServerErrorTest.java       |  59 ++++++
 .../apache/druid/tasklogs/NoopTaskLogsTest.java    |   7 +
 ...skLogsTest.java => TaskPayloadManagerTest.java} |  20 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |  15 +-
 32 files changed, 967 insertions(+), 106 deletions(-)

diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh
index 66e34c99744..b5ec89ea8d4 100755
--- a/distribution/docker/peon.sh
+++ b/distribution/docker/peon.sh
@@ -149,7 +149,8 @@ then
     mkdir -p ${DRUID_DIRS_TO_CREATE}
 fi
 
-# take the ${TASK_JSON} environment variable and base64 decode, unzip and 
throw it in ${TASK_DIR}/task.json
-mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > 
${TASK_DIR}/task.json;
+# take the ${TASK_JSON} environment variable and base64 decode, unzip and 
throw it in ${TASK_DIR}/task.json.
+# If TASK_JSON is not set, CliPeon will pull the task.json file from deep 
storage.
+mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | 
gzip -d > ${TASK_DIR}/task.json;
 
 exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: 
org.apache.druid.cli.Main internal peon $@
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 4814d8cbb60..5c6c7c6b3eb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -42,6 +42,7 @@ import 
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -71,9 +72,9 @@ public class KubernetesPeonLifecycle
 
   protected enum State
   {
-    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link 
#join(long)} is called. */
+    /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or 
{@link #join(long)} is called. */
     NOT_STARTED,
-    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    /** Lifecycle's state since {@link #run(Job, long, long, boolean)} is 
called. */
     PENDING,
     /** Lifecycle's state since {@link #join(long)} is called. */
     RUNNING,
@@ -88,7 +89,6 @@ public class KubernetesPeonLifecycle
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
   private final TaskStateListener stateListener;
-
   @MonotonicNonNull
   private LogWatch logWatch;
 
@@ -119,11 +119,15 @@ public class KubernetesPeonLifecycle
    * @return
    * @throws IllegalStateException
    */
-  protected synchronized TaskStatus run(Job job, long launchTimeout, long 
timeout) throws IllegalStateException
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long 
timeout, boolean useDeepStorageForTaskPayload) throws IllegalStateException, 
IOException
   {
     try {
       updateState(new State[]{State.NOT_STARTED}, State.PENDING);
 
+      if (useDeepStorageForTaskPayload) {
+        writeTaskPayload(task);
+      }
+
       // In case something bad happens and run is called twice on this 
KubernetesPeonLifecycle, reset taskLocation.
       taskLocation = null;
       kubernetesClient.launchPeonJobAndWaitForStart(
@@ -144,6 +148,25 @@ public class KubernetesPeonLifecycle
     }
   }
 
+  private void writeTaskPayload(Task task) throws IOException
+  {
+    Path file = null;
+    try {
+      file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
+      FileUtils.writeStringToFile(file.toFile(), 
mapper.writeValueAsString(task), Charset.defaultCharset());
+      taskLogs.pushTaskPayload(task.getId(), file.toFile());
+    }
+    catch (Exception e) {
+      log.error("Failed to write task payload for task: %s", 
taskId.getOriginalTaskId());
+      throw new RuntimeException(e);
+    }
+    finally {
+      if (file != null) {
+        Files.deleteIfExists(file);
+      }
+    }
+  }
+
   /**
    * Join existing Kubernetes Job
    *
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 33efd848d0a..a0a29dcbbb9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -193,7 +193,8 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         taskStatus = peonLifecycle.run(
             adapter.fromTask(task),
             config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
-            config.getTaskTimeout().toStandardDuration().getMillis()
+            config.getTaskTimeout().toStandardDuration().getMillis(),
+            adapter.shouldUseDeepStorageForTaskPayload(task)
         );
       } else {
         taskStatus = peonLifecycle.join(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 92fc220e621..72d7ef0c00d 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
   private final ServiceEmitter emitter;
   private KubernetesTaskRunner runner;
 
-
   @Inject
   public KubernetesTaskRunnerFactory(
       @Smile ObjectMapper smileMapper,
@@ -137,7 +136,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
           taskConfig,
           startupLoggingConfig,
           druidNode,
-          smileMapper
+          smileMapper,
+          taskLogs
       );
     } else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
       return new PodTemplateTaskAdapter(
@@ -145,7 +145,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
           taskConfig,
           druidNode,
           smileMapper,
-          properties
+          properties,
+          taskLogs
       );
     } else {
       return new SingleContainerTaskAdapter(
@@ -154,7 +155,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
           taskConfig,
           startupLoggingConfig,
           druidNode,
-          smileMapper
+          smileMapper,
+          taskLogs
       );
     }
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index a5380bef3f2..7d35827f89b 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -40,5 +40,6 @@ public class DruidK8sConstants
   public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";
   public static final String LABEL_KEY = "druid.k8s.peons";
   public static final String DRUID_LABEL_PREFIX = "druid.";
+  public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
   static final Predicate<Throwable> IS_TRANSIENT = e -> e instanceof 
KubernetesResourceNotFoundException;
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 712bc1a47e2..862b176b115 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -41,7 +41,10 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ForkingTaskRunner;
@@ -57,8 +60,11 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -89,6 +95,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
   protected final StartupLoggingConfig startupLoggingConfig;
   protected final DruidNode node;
   protected final ObjectMapper mapper;
+  protected final TaskLogs taskLogs;
 
   public K8sTaskAdapter(
       KubernetesClientApi client,
@@ -96,7 +103,8 @@ public abstract class K8sTaskAdapter implements TaskAdapter
       TaskConfig taskConfig,
       StartupLoggingConfig startupLoggingConfig,
       DruidNode node,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskLogs taskLogs
   )
   {
     this.client = client;
@@ -105,6 +113,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
     this.startupLoggingConfig = startupLoggingConfig;
     this.node = node;
     this.mapper = mapper;
+    this.taskLogs = taskLogs;
   }
 
   @Override
@@ -132,11 +141,39 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter
     Optional<EnvVar> taskJson = envVars.stream().filter(x -> 
"TASK_JSON".equals(x.getName())).findFirst();
     String contents = taskJson.map(envVar -> 
taskJson.get().getValue()).orElse(null);
     if (contents == null) {
-      throw new IOException("No TASK_JSON environment variable found in pod: " 
+ from.getMetadata().getName());
+      log.info("No TASK_JSON environment variable found in pod: %s. Trying to 
load task payload from deep storage.", from.getMetadata().getName());
+      return toTaskUsingDeepStorage(from);
     }
     return mapper.readValue(Base64Compression.decompressBase64(contents), 
Task.class);
   }
 
+  private Task toTaskUsingDeepStorage(Job from) throws IOException
+  {
+    com.google.common.base.Optional<InputStream> taskBody = 
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+    if (!taskBody.isPresent()) {
+      throw InternalServerError.exception(
+          "Could not load task payload from deep storage for job [%s]. Check 
the overlord logs for any errors in uploading task payload to deep storage.",
+          from.getMetadata().getName()
+      );
+    }
+    String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+    return mapper.readValue(task, Task.class);
+  }
+
+  @Override
+  public K8sTaskId getTaskId(Job from)
+  {
+    Map<String, String> annotations = 
from.getSpec().getTemplate().getMetadata().getAnnotations();
+    if (annotations == null) {
+      throw DruidException.defensive().build("No annotations found on pod spec 
for job [%s]", from.getMetadata().getName());
+    }
+    String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+    if (taskId == null) {
+      throw DruidException.defensive().build("No task_id annotation found on 
pod spec for job [%s]", from.getMetadata().getName());
+    }
+    return new K8sTaskId(taskId);
+  }
+
   @VisibleForTesting
   abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, 
PeonCommandContext context) throws IOException;
 
@@ -219,15 +256,11 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter
                                      .build());
     }
 
-    mainContainer.getEnv().addAll(Lists.newArrayList(
+    List<EnvVar> envVars = Lists.newArrayList(
         new EnvVarBuilder()
             .withName(DruidK8sConstants.TASK_DIR_ENV)
             .withValue(context.getTaskDir().getAbsolutePath())
             .build(),
-        new EnvVarBuilder()
-            .withName(DruidK8sConstants.TASK_JSON_ENV)
-            .withValue(taskContents)
-            .build(),
         new EnvVarBuilder()
             .withName(DruidK8sConstants.JAVA_OPTS)
             .withValue(Joiner.on(" ").join(context.getJavaOpts()))
@@ -244,7 +277,17 @@ public abstract class K8sTaskAdapter implements TaskAdapter
                 null,
                 "metadata.name"
             )).build()).build()
-    ));
+    );
+
+    if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
+      envVars.add(
+          new EnvVarBuilder()
+              .withName(DruidK8sConstants.TASK_JSON_ENV)
+              .withValue(taskContents)
+              .build()
+      );
+    }
+    mainContainer.getEnv().addAll(envVars);
   }
 
   protected Container setupMainContainer(
@@ -403,6 +446,9 @@ public abstract class K8sTaskAdapter implements TaskAdapter
       command.add("--loadBroadcastSegments");
       command.add("true");
     }
+
+    command.add("--taskId");
+    command.add(task.getId());
     log.info(
         "Peon Command for K8s job: %s",
         
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), 
command)
@@ -433,5 +479,12 @@ public abstract class K8sTaskAdapter implements TaskAdapter
     }
     return requirements;
   }
+
+  @Override
+  public boolean shouldUseDeepStorageForTaskPayload(Task task) throws 
IOException
+  {
+    String compressedTaskPayload = 
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+    return compressedTaskPayload.length() > 
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+  }
 }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
index 9cda8f86488..a81154a3bcb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
@@ -43,6 +43,7 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -59,10 +60,11 @@ public class MultiContainerTaskAdapter extends 
K8sTaskAdapter
       TaskConfig taskConfig,
       StartupLoggingConfig startupLoggingConfig,
       DruidNode druidNode,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskLogs taskLogs
   )
   {
-    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper);
+    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper, taskLogs);
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index a3d10f7dcd1..ef0509a673f 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -24,8 +24,8 @@ import com.fasterxml.jackson.databind.cfg.MapperConfig;
 import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
 import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import io.fabric8.kubernetes.api.model.EnvVar;
 import io.fabric8.kubernetes.api.model.EnvVarBuilder;
 import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
@@ -35,11 +35,13 @@ import io.fabric8.kubernetes.api.model.PodTemplate;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.client.utils.Serialization;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
 import org.apache.druid.guice.IndexingServiceModuleHelper;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -49,12 +51,16 @@ import 
org.apache.druid.k8s.overlord.common.DruidK8sConstants;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -85,13 +91,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter
   private final DruidNode node;
   private final ObjectMapper mapper;
   private final HashMap<String, PodTemplate> templates;
+  private final TaskLogs taskLogs;
 
   public PodTemplateTaskAdapter(
       KubernetesTaskRunnerConfig taskRunnerConfig,
       TaskConfig taskConfig,
       DruidNode node,
       ObjectMapper mapper,
-      Properties properties
+      Properties properties,
+      TaskLogs taskLogs
   )
   {
     this.taskRunnerConfig = taskRunnerConfig;
@@ -99,6 +107,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
     this.node = node;
     this.mapper = mapper;
     this.templates = initializePodTemplates(properties);
+    this.taskLogs = taskLogs;
   }
 
   /**
@@ -163,15 +172,44 @@ public class PodTemplateTaskAdapter implements TaskAdapter
   {
     Map<String, String> annotations = 
from.getSpec().getTemplate().getMetadata().getAnnotations();
     if (annotations == null) {
-      throw new IOE("No annotations found on pod spec for job [%s]", 
from.getMetadata().getName());
+      log.info("No annotations found on pod spec for job [%s]. Trying to load 
task payload from deep storage.", from.getMetadata().getName());
+      return toTaskUsingDeepStorage(from);
     }
     String task = annotations.get(DruidK8sConstants.TASK);
     if (task == null) {
-      throw new IOE("No task annotation found on pod spec for job [%s]", 
from.getMetadata().getName());
+      log.info("No task annotation found on pod spec for job [%s]. Trying to 
load task payload from deep storage.", from.getMetadata().getName());
+      return toTaskUsingDeepStorage(from);
     }
     return mapper.readValue(Base64Compression.decompressBase64(task), 
Task.class);
   }
 
+  private Task toTaskUsingDeepStorage(Job from) throws IOException
+  {
+    com.google.common.base.Optional<InputStream> taskBody = 
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+    if (!taskBody.isPresent()) {
+      throw InternalServerError.exception(
+          "Could not load task payload from deep storage for job [%s]. Check 
the overlord logs for errors uploading task payloads to deep storage.",
+          from.getMetadata().getName()
+      );
+    }
+    String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+    return mapper.readValue(task, Task.class);
+  }
+
+  @Override
+  public K8sTaskId getTaskId(Job from)
+  {
+    Map<String, String> annotations = 
from.getSpec().getTemplate().getMetadata().getAnnotations();
+    if (annotations == null) {
+      throw DruidException.defensive().build("No annotations found on pod spec 
for job [%s]", from.getMetadata().getName());
+    }
+    String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+    if (taskId == null) {
+      throw DruidException.defensive().build("No task_id annotation found on 
pod spec for job [%s]", from.getMetadata().getName());
+    }
+    return new K8sTaskId(taskId);
+  }
+
   private HashMap<String, PodTemplate> initializePodTemplates(Properties 
properties)
   {
     HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
@@ -208,9 +246,9 @@ public class PodTemplateTaskAdapter implements TaskAdapter
     }
   }
 
-  private Collection<EnvVar> getEnv(Task task)
+  private Collection<EnvVar> getEnv(Task task) throws IOException
   {
-    return ImmutableList.of(
+    List<EnvVar> envVars = Lists.newArrayList(
         new EnvVarBuilder()
             .withName(DruidK8sConstants.TASK_DIR_ENV)
             .withValue(taskConfig.getBaseDir())
@@ -219,17 +257,21 @@ public class PodTemplateTaskAdapter implements TaskAdapter
             .withName(DruidK8sConstants.TASK_ID_ENV)
             .withValue(task.getId())
             .build(),
-        new EnvVarBuilder()
-            .withName(DruidK8sConstants.TASK_JSON_ENV)
-            .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new 
ObjectFieldSelector(
-                null,
-                StringUtils.format("metadata.annotations['%s']", 
DruidK8sConstants.TASK)
-            )).build()).build(),
         new EnvVarBuilder()
             .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
             .withValue(Boolean.toString(task.supportsQueries()))
             .build()
     );
+    if (!shouldUseDeepStorageForTaskPayload(task)) {
+      envVars.add(new EnvVarBuilder()
+          .withName(DruidK8sConstants.TASK_JSON_ENV)
+          .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new 
ObjectFieldSelector(
+              null,
+              StringUtils.format("metadata.annotations['%s']", 
DruidK8sConstants.TASK)
+          )).build()).build()
+      );
+    }
+    return envVars;
   }
 
   private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig config, 
Task task)
@@ -239,14 +281,18 @@ public class PodTemplateTaskAdapter implements TaskAdapter
 
   private Map<String, String> getPodTemplateAnnotations(Task task) throws 
IOException
   {
-    return ImmutableMap.<String, String>builder()
-        .put(DruidK8sConstants.TASK, 
Base64Compression.compressBase64(mapper.writeValueAsString(task)))
+    ImmutableMap.Builder<String, String> podTemplateAnnotationBuilder = 
ImmutableMap.<String, String>builder()
         .put(DruidK8sConstants.TLS_ENABLED, 
String.valueOf(node.isEnableTlsPort()))
         .put(DruidK8sConstants.TASK_ID, task.getId())
         .put(DruidK8sConstants.TASK_TYPE, task.getType())
         .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
-        .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
-        .build();
+        .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource());
+
+    if (!shouldUseDeepStorageForTaskPayload(task)) {
+      podTemplateAnnotationBuilder
+          .put(DruidK8sConstants.TASK, 
Base64Compression.compressBase64(mapper.writeValueAsString(task)));
+    }
+    return podTemplateAnnotationBuilder.build();
   }
   
   private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config, 
Task task)
@@ -276,4 +322,11 @@ public class PodTemplateTaskAdapter implements TaskAdapter
   {
     return DruidK8sConstants.DRUID_LABEL_PREFIX + baseLabel;
   }
+
+  @Override
+  public boolean shouldUseDeepStorageForTaskPayload(Task task) throws 
IOException
+  {
+    String compressedTaskPayload = 
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+    return compressedTaskPayload.length() > 
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
index c64acd15310..be0588c35b1 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -47,10 +48,11 @@ public class SingleContainerTaskAdapter extends 
K8sTaskAdapter
       TaskConfig taskConfig,
       StartupLoggingConfig startupLoggingConfig,
       DruidNode druidNode,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskLogs taskLogs
   )
   {
-    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper);
+    super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, 
druidNode, mapper, taskLogs);
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
index 05933604f2b..9dacb213cf3 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
@@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord.taskadapter;
 
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
 
 import java.io.IOException;
 
@@ -31,4 +32,10 @@ public interface TaskAdapter
 
   Task toTask(Job from) throws IOException;
 
+  K8sTaskId getTaskId(Job from);
+
+  /**
+   * Method for exposing to external classes whether the task has its task 
payload bundled by the adapter or relies on a external system
+   */
+  boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException;
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 4c46c278e26..1c6e429a3dc 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -79,7 +79,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_run()
+  public void test_run() throws IOException
   {
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
@@ -114,7 +114,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     replayAll();
 
-    TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
+    TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, false);
 
     verifyAll();
 
@@ -124,7 +124,51 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
+  public void test_run_useTaskManager() throws IOException
+  {
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    )
+    {
+      @Override
+      protected synchronized TaskStatus join(long timeout)
+      {
+        return TaskStatus.success(ID);
+      }
+    };
+
+    Job job = new 
JobBuilder().withNewMetadata().withName(ID).endMetadata().build();
+
+    EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
+        EasyMock.eq(job),
+        EasyMock.eq(task),
+        EasyMock.anyLong(),
+        EasyMock.eq(TimeUnit.MILLISECONDS)
+    )).andReturn(null);
+    Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
+
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
+
+    taskLogs.pushTaskPayload(EasyMock.anyString(), EasyMock.anyObject());
+    replayAll();
+
+    TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, true);
+
+    verifyAll();
+    Assert.assertTrue(taskStatus.isSuccess());
+    Assert.assertEquals(ID, taskStatus.getId());
+    Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, 
peonLifecycle.getState());
+  }
+
+  @Test
+  public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() 
throws IOException
   {
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
@@ -159,12 +203,12 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     replayAll();
 
-    peonLifecycle.run(job, 0L, 0L);
+    peonLifecycle.run(job, 0L, 0L, false);
 
     Assert.assertThrows(
         "Task [id] failed to run: invalid peon lifecycle state transition 
[STOPPED]->[PENDING]",
         IllegalStateException.class,
-        () -> peonLifecycle.run(job, 0L, 0L)
+        () -> peonLifecycle.run(job, 0L, 0L, false)
     );
 
     verifyAll();
@@ -208,7 +252,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     Assert.assertThrows(
         Exception.class,
-        () -> peonLifecycle.run(job, 0L, 0L)
+        () -> peonLifecycle.run(job, 0L, 0L, false)
     );
 
     verifyAll();
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index e6b1b8006af..36a7b4cfcd9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -221,10 +221,12 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     TaskStatus taskStatus = TaskStatus.success(task.getId());
 
     EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+    
EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
     EasyMock.expect(kubernetesPeonLifecycle.run(
         EasyMock.eq(job),
         EasyMock.anyLong(),
-        EasyMock.anyLong()
+        EasyMock.anyLong(),
+        EasyMock.anyBoolean()
     )).andReturn(taskStatus);
 
     replayAll();
@@ -256,10 +258,12 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
         .build();
 
     EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+    
EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
     EasyMock.expect(kubernetesPeonLifecycle.run(
         EasyMock.eq(job),
         EasyMock.anyLong(),
-        EasyMock.anyLong()
+        EasyMock.anyLong(),
+        EasyMock.anyBoolean()
     )).andThrow(new IllegalStateException());
 
     replayAll();
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 22e2311bbba..09816168588 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -129,7 +129,8 @@ public class DruidPeonClientIntegrationTest
         taskConfig,
         startupLoggingConfig,
         druidNode,
-        jsonMapper
+        jsonMapper,
+        null
     );
     String taskBasePath = "/home/taskDir";
     PeonCommandContext context = new 
PeonCommandContext(Collections.singletonList(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 19701e4f26e..c1100ccc29d 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -37,10 +37,13 @@ import io.fabric8.kubernetes.api.model.Quantity;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.JobList;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -52,6 +55,7 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.KubernetesExecutor;
 import 
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
@@ -59,15 +63,23 @@ import 
org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -82,6 +94,8 @@ class K8sTaskAdapterTest
   private final TaskConfig taskConfig;
   private final DruidNode node;
   private final ObjectMapper jsonMapper;
+  private final TaskLogs taskLogs;
+
 
   public K8sTaskAdapterTest()
   {
@@ -105,6 +119,7 @@ class K8sTaskAdapterTest
     );
     startupLoggingConfig = new StartupLoggingConfig();
     taskConfig = new 
TaskConfigBuilder().setBaseDir("src/test/resources").build();
+    taskLogs = new NoopTaskLogs();
   }
 
   @Test
@@ -139,7 +154,9 @@ class K8sTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         node,
-        jsonMapper
+        jsonMapper,
+        taskLogs
+
     );
     Task task = K8sTestUtils.getTask();
     Job jobFromSpec = adapter.fromTask(task);
@@ -166,7 +183,8 @@ class K8sTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         node,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     Task task = K8sTestUtils.getTask();
     Job jobFromSpec = adapter.createJobFromPodSpec(
@@ -189,6 +207,169 @@ class K8sTaskAdapterTest
     assertEquals(task, taskFromJob);
   }
 
+  @Test
+  public void fromTask_dontSetTaskJSON() throws IOException
+  {
+    final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
+    TestKubernetesClient testClient = new TestKubernetesClient(client)
+    {
+      @SuppressWarnings("unchecked")
+      @Override
+      public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
+      {
+        return (T) new Pod()
+        {
+          @Override
+          public PodSpec getSpec()
+          {
+            return podSpec;
+          }
+        };
+      }
+    };
+
+    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+        .withNamespace("test")
+        .build();
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        taskLogs
+    );
+    Task task = new NoopTask(
+        "id",
+        "id",
+        "datasource",
+        0,
+        0,
+        ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) 
DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
+    );
+    Job job = adapter.fromTask(task);
+    // TASK_JSON should not be set in env variables
+    Assertions.assertFalse(
+        job.getSpec()
+            .getTemplate()
+            .getSpec()
+            .getContainers()
+            .get(0).getEnv()
+            .stream().anyMatch(env -> 
env.getName().equals(DruidK8sConstants.TASK_JSON_ENV))
+    );
+
+    // --taskId <TASK_ID> should be passed to the peon command args
+    Assertions.assertTrue(
+        Arrays.stream(job.getSpec()
+            .getTemplate()
+            .getSpec()
+            .getContainers()
+            .get(0)
+            .getArgs()
+            .get(0).split(" ")).collect(Collectors.toSet())
+            .containsAll(ImmutableList.of("--taskId", task.getId()))
+    );
+  }
+
+  @Test
+  public void toTask_useTaskPayloadManager() throws IOException
+  {
+    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+        .withNamespace("test")
+        .build();
+    Task taskInTaskPayloadManager = K8sTestUtils.getTask();
+    TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+    
Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of(
+        new 
ByteArrayInputStream(jsonMapper.writeValueAsString(taskInTaskPayloadManager).getBytes(Charset.defaultCharset()))
+    ));
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        mockTestLogs
+    );
+
+    Job job = new JobBuilder()
+        .editMetadata().withName("job").endMetadata()
+        .editSpec().editTemplate().editMetadata()
+        .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+        .endMetadata().editSpec().addToContainers(new 
ContainerBuilder().withName("main").build()).endSpec().endTemplate().endSpec().build();
+
+    Task taskFromJob = adapter.toTask(job);
+    assertEquals(taskInTaskPayloadManager, taskFromJob);
+  }
+
+  @Test
+  public void getTaskId()
+  {
+    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+        .endMetadata().endTemplate().endSpec().build();
+
+    assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
+  }
+
+  @Test
+  public void getTaskId_noAnnotations()
+  {
+    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .endMetadata().endTemplate().endSpec()
+        .editMetadata().withName("job").endMetadata().build();
+
+    Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+  }
+
+  @Test
+  public void getTaskId_missingTaskIdAnnotation()
+  {
+    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+        .endMetadata().endTemplate().endSpec()
+        .editMetadata().withName("job").endMetadata().build();
+
+    Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+  }
   @Test
   void testGrabbingTheLastXmxValueFromACommand()
   {
@@ -282,7 +463,8 @@ class K8sTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         node,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     Task task = K8sTestUtils.getTask();
     // no monitor in overlord, no monitor override
@@ -305,7 +487,8 @@ class K8sTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         node,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     adapter.addEnvironmentVariables(container, context, task.toString());
     EnvVar env = container.getEnv()
@@ -322,7 +505,8 @@ class K8sTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         node,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     container.getEnv().add(new EnvVarBuilder()
                                .withName("druid_monitoring_monitors")
@@ -347,13 +531,16 @@ class K8sTaskAdapterTest
     KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
         .withNamespace("test")
         .build();
-    SingleContainerTaskAdapter adapter =
-        new SingleContainerTaskAdapter(testClient,
-                                       config, taskConfig,
-                                       startupLoggingConfig,
-                                       node,
-                                       jsonMapper
-        );
+
+    SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        node,
+        jsonMapper,
+        taskLogs
+    );
     NoopTask task = K8sTestUtils.createTask("id", 1);
     Job actual = adapter.createJobFromPodSpec(
         pod.getSpec(),
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index ac6e32d140e..45ea0873376 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -41,6 +41,8 @@ import 
org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -58,6 +60,7 @@ class MultiContainerTaskAdapterTest
   private TaskConfig taskConfig;
   private DruidNode druidNode;
   private ObjectMapper jsonMapper;
+  @Mock private TaskLogs taskLogs;
 
   @BeforeEach
   public void setup()
@@ -98,7 +101,8 @@ class MultiContainerTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         druidNode,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     NoopTask task = K8sTestUtils.createTask("id", 1);
     Job actual = adapter.createJobFromPodSpec(
@@ -146,7 +150,8 @@ class MultiContainerTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         druidNode,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     NoopTask task = K8sTestUtils.createTask("id", 1);
     PodSpec spec = pod.getSpec();
@@ -191,12 +196,16 @@ class MultiContainerTaskAdapterTest
         .withPrimaryContainerName("primary")
         
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
         .build();
-    MultiContainerTaskAdapter adapter = new 
MultiContainerTaskAdapter(testClient,
-                                                                       config,
-                                                                       
taskConfig,
-                                                                       
startupLoggingConfig,
-                                                                       
druidNode,
-                                                                       
jsonMapper);
+
+    MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+        testClient,
+        config,
+        taskConfig,
+        startupLoggingConfig,
+        druidNode,
+        jsonMapper,
+        taskLogs
+    );
     NoopTask task = K8sTestUtils.createTask("id", 1);
     PodSpec spec = pod.getSpec();
     K8sTaskAdapter.massageSpec(spec, config.getPrimaryContainerName());
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index dd4eadfea29..74dfacd1a32 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -20,30 +20,39 @@
 package org.apache.druid.k8s.overlord.taskadapter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import io.fabric8.kubernetes.api.model.PodTemplate;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
 import org.apache.druid.k8s.overlord.common.Base64Compression;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
 import org.easymock.EasyMock;
+import org.easymock.Mock;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
@@ -51,6 +60,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class PodTemplateTaskAdapterTest
 {
   @TempDir private Path tempDir;
@@ -59,6 +70,7 @@ public class PodTemplateTaskAdapterTest
   private TaskConfig taskConfig;
   private DruidNode node;
   private ObjectMapper mapper;
+  @Mock private TaskLogs taskLogs;
 
   @BeforeEach
   public void setup()
@@ -89,7 +101,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        new Properties()
+        new Properties(),
+        taskLogs
     ));
   }
 
@@ -109,7 +122,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     ));
   }
 
@@ -127,7 +141,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -159,7 +174,8 @@ public class PodTemplateTaskAdapterTest
             true
         ),
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -185,7 +201,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     ));
   }
 
@@ -204,7 +221,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -215,7 +233,41 @@ public class PodTemplateTaskAdapterTest
   }
 
   @Test
-  public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
+  public void 
test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        taskLogs
+    );
+
+    Task task = new NoopTask(
+        "id",
+        "id",
+        "datasource",
+        0,
+        0,
+        ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) 
DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
+    );
+
+    Job actual = adapter.fromTask(task);
+    Job expected = 
K8sTestUtils.fileToResource("expectedNoopJobNoTaskJson.yaml", Job.class);
+
+    Assertions.assertEquals(actual, expected);
+  }
+
+  @Test
+  public void test_fromTask_withoutAnnotations_throwsDruidException() throws 
IOException
   {
     Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
     mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -228,17 +280,91 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", 
Job.class);
 
 
-    Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+    Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
+  }
+
+  @Test
+  public void test_getTaskId() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+        .endMetadata().endTemplate().endSpec().build();
+
+    assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
   }
 
   @Test
-  public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws 
IOException
+  public void test_getTaskId_noAnnotations() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .endMetadata().endTemplate().endSpec()
+        .editMetadata().withName("job").endMetadata().build();
+
+    Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+  }
+
+  @Test
+  public void test_getTaskId_missingTaskIdAnnotation() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        taskLogs
+    );
+    Job job = new JobBuilder()
+        .editSpec().editTemplate().editMetadata()
+        .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+        .endMetadata().endTemplate().endSpec()
+        .editMetadata().withName("job").endMetadata().build();
+
+    Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+  }
+
+  @Test
+  public void test_toTask_withoutTaskAnnotation_throwsIOE() throws IOException
   {
     Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
     mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -251,7 +377,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Job baseJob = 
K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@@ -265,11 +392,11 @@ public class PodTemplateTaskAdapterTest
         .endTemplate()
         .endSpec()
         .build();
-    Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+    Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
   }
 
   @Test
-  public void test_fromTask() throws IOException
+  public void test_toTask() throws IOException
   {
     Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
     mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -282,7 +409,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
@@ -292,6 +420,35 @@ public class PodTemplateTaskAdapterTest
     Assertions.assertEquals(expected, actual);
   }
 
+  @Test
+  public void test_toTask_useTaskPayloadManager() throws IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.put("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+
+    Task expected = new NoopTask("id", null, "datasource", 0, 0, 
ImmutableMap.of());
+    TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+    Mockito.when(mockTestLogs.streamTaskPayload("id")).thenReturn(Optional.of(
+        new 
ByteArrayInputStream(mapper.writeValueAsString(expected).getBytes(Charset.defaultCharset()))
+    ));
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        mockTestLogs
+    );
+
+    Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
+    Task actual = adapter.toTask(job);
+    Assertions.assertEquals(expected, actual);
+  }
+
   @Test
   public void test_fromTask_withRealIds() throws IOException
   {
@@ -307,7 +464,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Task task = new NoopTask(
@@ -340,7 +498,8 @@ public class PodTemplateTaskAdapterTest
         taskConfig,
         node,
         mapper,
-        props
+        props,
+        taskLogs
     );
 
     Task task = EasyMock.mock(Task.class);
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index 10e129a9c2b..43a40daedc1 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -39,6 +39,8 @@ import 
org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -57,6 +59,8 @@ class SingleContainerTaskAdapterTest
   private DruidNode druidNode;
   private ObjectMapper jsonMapper;
 
+  @Mock private TaskLogs taskLogs;
+
   @BeforeEach
   public void setup()
   {
@@ -96,7 +100,8 @@ class SingleContainerTaskAdapterTest
         taskConfig,
         startupLoggingConfig,
         druidNode,
-        jsonMapper
+        jsonMapper,
+        taskLogs
     );
     NoopTask task = K8sTestUtils.createTask("id", 1);
     Job actual = adapter.createJobFromPodSpec(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index 0998d592fee..2cef837f397 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -42,11 +42,11 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_SEGMENTS"
+              value: "false"
             - name: "TASK_JSON"
               valueFrom:
                 fieldRef:
                   fieldPath: "metadata.annotations['task']"
-            - name: "LOAD_BROADCAST_SEGMENTS"
-              value: "false"
           image: one
           name: primary
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index bb8f64c5e5e..cf16c49c5db 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -42,11 +42,11 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: 
"api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
+            - name: "LOAD_BROADCAST_SEGMENTS"
+              value: "false"
             - name: "TASK_JSON"
               valueFrom:
                 fieldRef:
                   fieldPath: "metadata.annotations['task']"
-            - name: "LOAD_BROADCAST_SEGMENTS"
-              value: "false"
           image: one
           name: primary
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
similarity index 77%
copy from 
extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
copy to 
extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 0998d592fee..d72d0ef37b0 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -26,7 +26,6 @@ spec:
         druid.task.group.id: "id"
         druid.task.datasource: "datasource"
       annotations:
-        task: 
"H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
         tls.enabled: "false"
         task.id: "id"
         task.type: "noop"
@@ -42,10 +41,6 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
-            - name: "TASK_JSON"
-              valueFrom:
-                fieldRef:
-                  fieldPath: "metadata.annotations['task']"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
           image: one
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
index e6762af63cf..a230ac913a6 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -42,11 +42,11 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_SEGMENTS"
+              value: "false"
             - name: "TASK_JSON"
               valueFrom:
                 fieldRef:
                   fieldPath: "metadata.annotations['task']"
-            - name: "LOAD_BROADCAST_SEGMENTS"
-              value: "false"
           image: one
           name: primary
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 53112d0808d..b9fd9e42862 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -83,6 +83,21 @@ public class S3TaskLogs implements TaskLogs
     return streamTaskFileWithRetry(0, taskKey);
   }
 
+  @Override
+  public void pushTaskPayload(String taskid, File taskPayloadFile) throws 
IOException
+  {
+    final String taskKey = getTaskLogKey(taskid, "task.json");
+    log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, 
taskKey);
+    pushTaskFile(taskPayloadFile, taskKey);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskPayload(String taskid) throws 
IOException
+  {
+    final String taskKey = getTaskLogKey(taskid, "task.json");
+    return streamTaskFileWithRetry(0, taskKey);
+  }
+
   /**
    * Using the retry conditions defined in {@link S3Utils#S3RETRY}.
    */
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 7428081aa3e..011dc488845 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -35,8 +35,11 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
 import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -55,6 +58,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -143,6 +147,78 @@ public class S3TaskLogsTest extends EasyMockSupport
     EasyMock.verify(s3Client);
   }
 
+  @Test
+  public void test_pushTaskPayload() throws IOException
+  {
+    Capture<PutObjectRequest> putObjectRequestCapture = 
Capture.newInstance(CaptureType.FIRST);
+    
EasyMock.expect(s3Client.putObject(EasyMock.capture(putObjectRequestCapture)))
+        .andReturn(new PutObjectResult())
+        .once();
+
+    EasyMock.replay(s3Client);
+
+    S3TaskLogsConfig config = new S3TaskLogsConfig();
+    config.setS3Bucket(TEST_BUCKET);
+    config.setS3Prefix("prefix");
+    config.setDisableAcl(true);
+
+    CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+
+    File payloadFile = tempFolder.newFile("task.json");
+    String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+    s3TaskLogs.pushTaskPayload(taskId, payloadFile);
+
+    PutObjectRequest putObjectRequest = putObjectRequestCapture.getValue();
+    Assert.assertEquals(TEST_BUCKET, putObjectRequest.getBucketName());
+    Assert.assertEquals("prefix/" + taskId + "/task.json", 
putObjectRequest.getKey());
+    Assert.assertEquals(payloadFile, putObjectRequest.getFile());
+    EasyMock.verify(s3Client);
+  }
+
+  @Test
+  public void test_streamTaskPayload() throws IOException
+  {
+    String taskPayloadString = "task payload";
+
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(taskPayloadString.length());
+    EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyObject(), 
EasyMock.anyObject()))
+        .andReturn(objectMetadata)
+        .once();
+
+    InputStream taskPayload = new 
ByteArrayInputStream(taskPayloadString.getBytes(Charset.defaultCharset()));
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(taskPayload);
+    Capture<GetObjectRequest> getObjectRequestCapture = 
Capture.newInstance(CaptureType.FIRST);
+    
EasyMock.expect(s3Client.getObject(EasyMock.capture(getObjectRequestCapture)))
+        .andReturn(s3Object)
+        .once();
+
+    EasyMock.replay(s3Client);
+
+    S3TaskLogsConfig config = new S3TaskLogsConfig();
+    config.setS3Bucket(TEST_BUCKET);
+    config.setS3Prefix("prefix");
+    config.setDisableAcl(true);
+
+    CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+
+    String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+    Optional<InputStream> payloadResponse = 
s3TaskLogs.streamTaskPayload(taskId);
+
+    GetObjectRequest getObjectRequest = getObjectRequestCapture.getValue();
+    Assert.assertEquals(TEST_BUCKET, getObjectRequest.getBucketName());
+    Assert.assertEquals("prefix/" + taskId + "/task.json", 
getObjectRequest.getKey());
+    Assert.assertTrue(payloadResponse.isPresent());
+
+    Assert.assertEquals(taskPayloadString, 
IOUtils.toString(payloadResponse.get(), Charset.defaultCharset()));
+    EasyMock.verify(s3Client);
+  }
+
   @Test
   public void test_killAll_noException_deletesAllTaskLogs() throws IOException
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
 
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
index be2c62540e0..6e7df12f8d2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
@@ -29,6 +29,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.tasklogs.TaskLogKiller;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogs;
+import org.apache.druid.tasklogs.TaskPayloadManager;
 
 /**
  */
@@ -48,5 +49,6 @@ public class IndexingServiceTaskLogsModule implements Module
 
     binder.bind(TaskLogPusher.class).to(TaskLogs.class);
     binder.bind(TaskLogKiller.class).to(TaskLogs.class);
+    binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/error/InternalServerError.java 
b/processing/src/main/java/org/apache/druid/error/InternalServerError.java
new file mode 100644
index 00000000000..b730acb0e3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/error/InternalServerError.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+public class InternalServerError extends DruidException.Failure
+{
+  public static DruidException exception(String errorCode, String msg, 
Object... args)
+  {
+    return exception(null, errorCode, msg, args);
+  }
+  public static DruidException exception(Throwable t, String errorCode, String 
msg, Object... args)
+  {
+    return DruidException.fromFailure(new InternalServerError(t, errorCode, 
msg, args));
+  }
+
+  private final Throwable t;
+  private final String msg;
+  private final Object[] args;
+
+  private InternalServerError(
+      Throwable t,
+      String errorCode,
+      String msg,
+      Object... args
+  )
+  {
+    super(errorCode);
+    this.t = t;
+    this.msg = msg;
+    this.args = args;
+  }
+
+  @Override
+  public DruidException makeException(DruidException.DruidExceptionBuilder bob)
+  {
+    bob = bob.forPersona(DruidException.Persona.OPERATOR)
+        .ofCategory(DruidException.Category.RUNTIME_FAILURE);
+
+    if (t == null) {
+      return bob.build(msg, args);
+    } else {
+      return bob.build(t, msg, args);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java 
b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index 287b2f6fcc3..5568a5160fd 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -64,4 +64,16 @@ public class NoopTaskLogs implements TaskLogs
   {
     log.info("Noop: No task logs are deleted.");
   }
+
+  @Override
+  public void pushTaskPayload(String taskid, File taskPayloadFile)
+  {
+    log.info("Not pushing payload for task: %s", taskid);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskPayload(String taskid)
+  {
+    return Optional.absent();
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java 
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
index ee50217c957..2756911f6a3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
@@ -21,7 +21,8 @@ package org.apache.druid.tasklogs;
 
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
+
 @ExtensionPoint
-public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller
+public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, 
TaskLogKiller, TaskPayloadManager
 {
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java 
b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
new file mode 100644
index 00000000000..41db8e4556b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Something that knows how to push a task payload before it is run to 
somewhere
+ * a ingestion worker will be able to stream the task payload from when trying 
to run the task.
+ */
+@ExtensionPoint
+public interface TaskPayloadManager
+{
+  /**
+   * Save payload so it can be retrieved later.
+   *
+   * @return inputStream for this taskPayload, if available
+   */
+  default void pushTaskPayload(String taskid, File taskPayloadFile) throws 
IOException
+  {
+    throw new NotImplementedException(StringUtils.format("this 
druid.indexer.logs.type [%s] does not support managing task payloads yet. You 
will have to switch to using environment variables", getClass()));
+  }
+
+  /**
+   * Stream payload for a task.
+   *
+   * @return inputStream for this taskPayload, if available
+   */
+  default Optional<InputStream> streamTaskPayload(String taskid) throws 
IOException
+  {
+    throw new NotImplementedException(StringUtils.format("this 
druid.indexer.logs.type [%s] does not support managing task payloads yet. You 
will have to switch to using environment variables", getClass()));
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java 
b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java
new file mode 100644
index 00000000000..b28296b2c41
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+import org.apache.druid.matchers.DruidMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class InternalServerErrorTest
+{
+
+  @Test
+  public void testAsErrorResponse()
+  {
+    ErrorResponse errorResponse = new 
ErrorResponse(InternalServerError.exception("runtimeFailure", "Internal Server 
Error"));
+    final Map<String, Object> asMap = errorResponse.getAsMap();
+
+    MatcherAssert.assertThat(
+        asMap,
+        DruidMatchers.mapMatcher(
+            "error", "druidException",
+            "errorCode", "runtimeFailure",
+            "persona", "OPERATOR",
+            "category", "RUNTIME_FAILURE",
+            "errorMessage", "Internal Server Error"
+        )
+    );
+
+    ErrorResponse recomposed = ErrorResponse.fromMap(asMap);
+
+    MatcherAssert.assertThat(
+        recomposed.getUnderlyingException(),
+        new DruidExceptionMatcher(
+            DruidException.Persona.OPERATOR,
+            DruidException.Category.RUNTIME_FAILURE,
+            "runtimeFailure"
+        ).expectMessageContains("Internal Server Error")
+    );
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java 
b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
index 30192932a9e..2da98dab303 100644
--- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
@@ -32,4 +32,11 @@ public class NoopTaskLogsTest
     TaskLogs taskLogs = new NoopTaskLogs();
     Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
   }
+
+  @Test
+  public void test_streamTaskPayload() throws IOException
+  {
+    TaskLogs taskLogs = new NoopTaskLogs();
+    Assert.assertFalse(taskLogs.streamTaskPayload("id").isPresent());
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java 
b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
similarity index 68%
copy from 
processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
copy to 
processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
index 30192932a9e..4c53061a86f 100644
--- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
@@ -19,17 +19,25 @@
 
 package org.apache.druid.tasklogs;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-
-public class NoopTaskLogsTest
+public class TaskPayloadManagerTest implements TaskPayloadManager
 {
   @Test
-  public void test_streamTaskStatus() throws IOException
+  public void test_streamTaskPayload()
+  {
+    Assert.assertThrows(NotImplementedException.class,
+        () -> this.streamTaskPayload("id")
+    );
+  }
+
+  @Test
+  public void test_pushTaskPayload()
   {
-    TaskLogs taskLogs = new NoopTaskLogs();
-    Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
+    Assert.assertThrows(NotImplementedException.class,
+        () -> this.pushTaskPayload("id", null)
+    );
   }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 50dc64a1e06..a18e40b74ac 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -40,6 +40,8 @@ import com.google.inject.multibindings.MapBinder;
 import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 import io.netty.util.SuppressForbidden;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.curator.ZkEnablementConfig;
 import org.apache.druid.discovery.NodeRole;
@@ -130,10 +132,12 @@ import 
org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.apache.druid.server.metrics.ServiceStatusMonitor;
+import org.apache.druid.tasklogs.TaskPayloadManager;
 import org.eclipse.jetty.server.Server;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
@@ -177,6 +181,9 @@ public class CliPeon extends GuiceRunnable
   @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", 
description = "Enable loading of broadcast segments")
   public String loadBroadcastSegments = "false";
 
+  @Option(name = "--taskId", title = "taskId", description = "TaskId for 
fetching task.json remotely")
+  public String taskId = "";
+
   private static final Logger log = new Logger(CliPeon.class);
 
   private Properties properties;
@@ -286,9 +293,15 @@ public class CliPeon extends GuiceRunnable
 
           @Provides
           @LazySingleton
-          public Task readTask(@Json ObjectMapper mapper, 
ExecutorLifecycleConfig config)
+          public Task readTask(@Json ObjectMapper mapper, 
ExecutorLifecycleConfig config, TaskPayloadManager taskPayloadManager)
           {
             try {
+              if (!config.getTaskFile().exists() || 
config.getTaskFile().length() == 0) {
+                log.info("Task file not found, trying to pull task payload 
from deep storage");
+                String task = 
IOUtils.toString(taskPayloadManager.streamTaskPayload(taskId).get(), 
Charset.defaultCharset());
+                // write the remote task.json to the task file location for 
ExecutorLifecycle to pickup
+                FileUtils.write(config.getTaskFile(), task, 
Charset.defaultCharset());
+              }
               return mapper.readValue(config.getTaskFile(), Task.class);
             }
             catch (IOException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to