This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 64754b6799a Allow users to pass task payload via deep storage instead
of environment variable (#14887)
64754b6799a is described below
commit 64754b6799afcebc0943a18e3099069717fbd077
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Oct 3 04:38:59 2023 -0400
Allow users to pass task payload via deep storage instead of environment
variable (#14887)
This change is meant to fix a issue where passing too large of a task
payload to the mm-less task runner will cause the peon to fail to startup
because the payload is passed (compressed) as a environment variable
(TASK_JSON). In linux systems the limit for a environment variable is commonly
128KB, for windows systems less than this. Setting a env variable longer than
this results in a bunch of "Argument list too long" errors.
---
distribution/docker/peon.sh | 5 +-
.../k8s/overlord/KubernetesPeonLifecycle.java | 31 ++-
.../druid/k8s/overlord/KubernetesTaskRunner.java | 3 +-
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 10 +-
.../k8s/overlord/common/DruidK8sConstants.java | 1 +
.../k8s/overlord/taskadapter/K8sTaskAdapter.java | 69 ++++++-
.../taskadapter/MultiContainerTaskAdapter.java | 6 +-
.../taskadapter/PodTemplateTaskAdapter.java | 87 +++++++--
.../taskadapter/SingleContainerTaskAdapter.java | 6 +-
.../k8s/overlord/taskadapter/TaskAdapter.java | 7 +
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 56 +++++-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 8 +-
.../DruidPeonClientIntegrationTest.java | 3 +-
.../overlord/taskadapter/K8sTaskAdapterTest.java | 211 +++++++++++++++++++--
.../taskadapter/MultiContainerTaskAdapterTest.java | 25 ++-
.../taskadapter/PodTemplateTaskAdapterTest.java | 193 +++++++++++++++++--
.../SingleContainerTaskAdapterTest.java | 7 +-
.../src/test/resources/expectedNoopJob.yaml | 4 +-
.../src/test/resources/expectedNoopJobLongIds.yaml | 4 +-
...NoopJob.yaml => expectedNoopJobNoTaskJson.yaml} | 5 -
.../test/resources/expectedNoopJobTlsEnabled.yaml | 4 +-
.../org/apache/druid/storage/s3/S3TaskLogs.java | 15 ++
.../apache/druid/storage/s3/S3TaskLogsTest.java | 76 ++++++++
.../druid/guice/IndexingServiceTaskLogsModule.java | 2 +
.../apache/druid/error/InternalServerError.java | 62 ++++++
.../org/apache/druid/tasklogs/NoopTaskLogs.java | 12 ++
.../java/org/apache/druid/tasklogs/TaskLogs.java | 3 +-
.../apache/druid/tasklogs/TaskPayloadManager.java | 57 ++++++
.../druid/error/InternalServerErrorTest.java | 59 ++++++
.../apache/druid/tasklogs/NoopTaskLogsTest.java | 7 +
...skLogsTest.java => TaskPayloadManagerTest.java} | 20 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 15 +-
32 files changed, 967 insertions(+), 106 deletions(-)
diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh
index 66e34c99744..b5ec89ea8d4 100755
--- a/distribution/docker/peon.sh
+++ b/distribution/docker/peon.sh
@@ -149,7 +149,8 @@ then
mkdir -p ${DRUID_DIRS_TO_CREATE}
fi
-# take the ${TASK_JSON} environment variable and base64 decode, unzip and
throw it in ${TASK_DIR}/task.json
-mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d >
${TASK_DIR}/task.json;
+# take the ${TASK_JSON} environment variable and base64 decode, unzip and
throw it in ${TASK_DIR}/task.json.
+# If TASK_JSON is not set, CliPeon will pull the task.json file from deep
storage.
+mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d |
gzip -d > ${TASK_DIR}/task.json;
exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*:
org.apache.druid.cli.Main internal peon $@
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 4814d8cbb60..5c6c7c6b3eb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -42,6 +42,7 @@ import
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -71,9 +72,9 @@ public class KubernetesPeonLifecycle
protected enum State
{
- /** Lifecycle's state before {@link #run(Job, long, long)} or {@link
#join(long)} is called. */
+ /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or
{@link #join(long)} is called. */
NOT_STARTED,
- /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+ /** Lifecycle's state since {@link #run(Job, long, long, boolean)} is
called. */
PENDING,
/** Lifecycle's state since {@link #join(long)} is called. */
RUNNING,
@@ -88,7 +89,6 @@ public class KubernetesPeonLifecycle
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
-
@MonotonicNonNull
private LogWatch logWatch;
@@ -119,11 +119,15 @@ public class KubernetesPeonLifecycle
* @return
* @throws IllegalStateException
*/
- protected synchronized TaskStatus run(Job job, long launchTimeout, long
timeout) throws IllegalStateException
+ protected synchronized TaskStatus run(Job job, long launchTimeout, long
timeout, boolean useDeepStorageForTaskPayload) throws IllegalStateException,
IOException
{
try {
updateState(new State[]{State.NOT_STARTED}, State.PENDING);
+ if (useDeepStorageForTaskPayload) {
+ writeTaskPayload(task);
+ }
+
// In case something bad happens and run is called twice on this
KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
@@ -144,6 +148,25 @@ public class KubernetesPeonLifecycle
}
}
+ private void writeTaskPayload(Task task) throws IOException
+ {
+ Path file = null;
+ try {
+ file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
+ FileUtils.writeStringToFile(file.toFile(),
mapper.writeValueAsString(task), Charset.defaultCharset());
+ taskLogs.pushTaskPayload(task.getId(), file.toFile());
+ }
+ catch (Exception e) {
+ log.error("Failed to write task payload for task: %s",
taskId.getOriginalTaskId());
+ throw new RuntimeException(e);
+ }
+ finally {
+ if (file != null) {
+ Files.deleteIfExists(file);
+ }
+ }
+ }
+
/**
* Join existing Kubernetes Job
*
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 33efd848d0a..a0a29dcbbb9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -193,7 +193,8 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
- config.getTaskTimeout().toStandardDuration().getMillis()
+ config.getTaskTimeout().toStandardDuration().getMillis(),
+ adapter.shouldUseDeepStorageForTaskPayload(task)
);
} else {
taskStatus = peonLifecycle.join(
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 92fc220e621..72d7ef0c00d 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;
-
@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
@@ -137,7 +136,8 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
taskConfig,
startupLoggingConfig,
druidNode,
- smileMapper
+ smileMapper,
+ taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
@@ -145,7 +145,8 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
taskConfig,
druidNode,
smileMapper,
- properties
+ properties,
+ taskLogs
);
} else {
return new SingleContainerTaskAdapter(
@@ -154,7 +155,8 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
taskConfig,
startupLoggingConfig,
druidNode,
- smileMapper
+ smileMapper,
+ taskLogs
);
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index a5380bef3f2..7d35827f89b 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -40,5 +40,6 @@ public class DruidK8sConstants
public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";
public static final String LABEL_KEY = "druid.k8s.peons";
public static final String DRUID_LABEL_PREFIX = "druid.";
+ public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
static final Predicate<Throwable> IS_TRANSIENT = e -> e instanceof
KubernetesResourceNotFoundException;
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 712bc1a47e2..862b176b115 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -41,7 +41,10 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
@@ -57,8 +60,11 @@ import
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -89,6 +95,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
+ protected final TaskLogs taskLogs;
public K8sTaskAdapter(
KubernetesClientApi client,
@@ -96,7 +103,8 @@ public abstract class K8sTaskAdapter implements TaskAdapter
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
this.client = client;
@@ -105,6 +113,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
+ this.taskLogs = taskLogs;
}
@Override
@@ -132,11 +141,39 @@ public abstract class K8sTaskAdapter implements
TaskAdapter
Optional<EnvVar> taskJson = envVars.stream().filter(x ->
"TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar ->
taskJson.get().getValue()).orElse(null);
if (contents == null) {
- throw new IOException("No TASK_JSON environment variable found in pod: "
+ from.getMetadata().getName());
+ log.info("No TASK_JSON environment variable found in pod: %s. Trying to
load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents),
Task.class);
}
+ private Task toTaskUsingDeepStorage(Job from) throws IOException
+ {
+ com.google.common.base.Optional<InputStream> taskBody =
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+ if (!taskBody.isPresent()) {
+ throw InternalServerError.exception(
+ "Could not load task payload from deep storage for job [%s]. Check
the overlord logs for any errors in uploading task payload to deep storage.",
+ from.getMetadata().getName()
+ );
+ }
+ String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+ return mapper.readValue(task, Task.class);
+ }
+
+ @Override
+ public K8sTaskId getTaskId(Job from)
+ {
+ Map<String, String> annotations =
from.getSpec().getTemplate().getMetadata().getAnnotations();
+ if (annotations == null) {
+ throw DruidException.defensive().build("No annotations found on pod spec
for job [%s]", from.getMetadata().getName());
+ }
+ String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+ if (taskId == null) {
+ throw DruidException.defensive().build("No task_id annotation found on
pod spec for job [%s]", from.getMetadata().getName());
+ }
+ return new K8sTaskId(taskId);
+ }
+
@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task,
PeonCommandContext context) throws IOException;
@@ -219,15 +256,11 @@ public abstract class K8sTaskAdapter implements
TaskAdapter
.build());
}
- mainContainer.getEnv().addAll(Lists.newArrayList(
+ List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
- new EnvVarBuilder()
- .withName(DruidK8sConstants.TASK_JSON_ENV)
- .withValue(taskContents)
- .build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
@@ -244,7 +277,17 @@ public abstract class K8sTaskAdapter implements TaskAdapter
null,
"metadata.name"
)).build()).build()
- ));
+ );
+
+ if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
+ envVars.add(
+ new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_JSON_ENV)
+ .withValue(taskContents)
+ .build()
+ );
+ }
+ mainContainer.getEnv().addAll(envVars);
}
protected Container setupMainContainer(
@@ -403,6 +446,9 @@ public abstract class K8sTaskAdapter implements TaskAdapter
command.add("--loadBroadcastSegments");
command.add("true");
}
+
+ command.add("--taskId");
+ command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(),
command)
@@ -433,5 +479,12 @@ public abstract class K8sTaskAdapter implements TaskAdapter
}
return requirements;
}
+
+ @Override
+ public boolean shouldUseDeepStorageForTaskPayload(Task task) throws
IOException
+ {
+ String compressedTaskPayload =
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+ return compressedTaskPayload.length() >
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
index 9cda8f86488..a81154a3bcb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
@@ -43,6 +43,7 @@ import
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
import java.util.Collections;
@@ -59,10 +60,11 @@ public class MultiContainerTaskAdapter extends
K8sTaskAdapter
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
- super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper, taskLogs);
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index a3d10f7dcd1..ef0509a673f 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -24,8 +24,8 @@ import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
@@ -35,11 +35,13 @@ import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.utils.Serialization;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -49,12 +51,16 @@ import
org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -85,13 +91,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private final DruidNode node;
private final ObjectMapper mapper;
private final HashMap<String, PodTemplate> templates;
+ private final TaskLogs taskLogs;
public PodTemplateTaskAdapter(
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
DruidNode node,
ObjectMapper mapper,
- Properties properties
+ Properties properties,
+ TaskLogs taskLogs
)
{
this.taskRunnerConfig = taskRunnerConfig;
@@ -99,6 +107,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
this.node = node;
this.mapper = mapper;
this.templates = initializePodTemplates(properties);
+ this.taskLogs = taskLogs;
}
/**
@@ -163,15 +172,44 @@ public class PodTemplateTaskAdapter implements TaskAdapter
{
Map<String, String> annotations =
from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
- throw new IOE("No annotations found on pod spec for job [%s]",
from.getMetadata().getName());
+ log.info("No annotations found on pod spec for job [%s]. Trying to load
task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
- throw new IOE("No task annotation found on pod spec for job [%s]",
from.getMetadata().getName());
+ log.info("No task annotation found on pod spec for job [%s]. Trying to
load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(task),
Task.class);
}
+ private Task toTaskUsingDeepStorage(Job from) throws IOException
+ {
+ com.google.common.base.Optional<InputStream> taskBody =
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+ if (!taskBody.isPresent()) {
+ throw InternalServerError.exception(
+ "Could not load task payload from deep storage for job [%s]. Check
the overlord logs for errors uploading task payloads to deep storage.",
+ from.getMetadata().getName()
+ );
+ }
+ String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+ return mapper.readValue(task, Task.class);
+ }
+
+ @Override
+ public K8sTaskId getTaskId(Job from)
+ {
+ Map<String, String> annotations =
from.getSpec().getTemplate().getMetadata().getAnnotations();
+ if (annotations == null) {
+ throw DruidException.defensive().build("No annotations found on pod spec
for job [%s]", from.getMetadata().getName());
+ }
+ String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+ if (taskId == null) {
+ throw DruidException.defensive().build("No task_id annotation found on
pod spec for job [%s]", from.getMetadata().getName());
+ }
+ return new K8sTaskId(taskId);
+ }
+
private HashMap<String, PodTemplate> initializePodTemplates(Properties
properties)
{
HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
@@ -208,9 +246,9 @@ public class PodTemplateTaskAdapter implements TaskAdapter
}
}
- private Collection<EnvVar> getEnv(Task task)
+ private Collection<EnvVar> getEnv(Task task) throws IOException
{
- return ImmutableList.of(
+ List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(taskConfig.getBaseDir())
@@ -219,17 +257,21 @@ public class PodTemplateTaskAdapter implements TaskAdapter
.withName(DruidK8sConstants.TASK_ID_ENV)
.withValue(task.getId())
.build(),
- new EnvVarBuilder()
- .withName(DruidK8sConstants.TASK_JSON_ENV)
- .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new
ObjectFieldSelector(
- null,
- StringUtils.format("metadata.annotations['%s']",
DruidK8sConstants.TASK)
- )).build()).build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
.build()
);
+ if (!shouldUseDeepStorageForTaskPayload(task)) {
+ envVars.add(new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_JSON_ENV)
+ .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new
ObjectFieldSelector(
+ null,
+ StringUtils.format("metadata.annotations['%s']",
DruidK8sConstants.TASK)
+ )).build()).build()
+ );
+ }
+ return envVars;
}
private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig config,
Task task)
@@ -239,14 +281,18 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private Map<String, String> getPodTemplateAnnotations(Task task) throws
IOException
{
- return ImmutableMap.<String, String>builder()
- .put(DruidK8sConstants.TASK,
Base64Compression.compressBase64(mapper.writeValueAsString(task)))
+ ImmutableMap.Builder<String, String> podTemplateAnnotationBuilder =
ImmutableMap.<String, String>builder()
.put(DruidK8sConstants.TLS_ENABLED,
String.valueOf(node.isEnableTlsPort()))
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
- .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
- .build();
+ .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource());
+
+ if (!shouldUseDeepStorageForTaskPayload(task)) {
+ podTemplateAnnotationBuilder
+ .put(DruidK8sConstants.TASK,
Base64Compression.compressBase64(mapper.writeValueAsString(task)));
+ }
+ return podTemplateAnnotationBuilder.build();
}
private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config,
Task task)
@@ -276,4 +322,11 @@ public class PodTemplateTaskAdapter implements TaskAdapter
{
return DruidK8sConstants.DRUID_LABEL_PREFIX + baseLabel;
}
+
+ @Override
+ public boolean shouldUseDeepStorageForTaskPayload(Task task) throws
IOException
+ {
+ String compressedTaskPayload =
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+ return compressedTaskPayload.length() >
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
index c64acd15310..be0588c35b1 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
@@ -32,6 +32,7 @@ import
org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
import java.util.Collections;
@@ -47,10 +48,11 @@ public class SingleContainerTaskAdapter extends
K8sTaskAdapter
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
- super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper, taskLogs);
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
index 05933604f2b..9dacb213cf3 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
@@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord.taskadapter;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import java.io.IOException;
@@ -31,4 +32,10 @@ public interface TaskAdapter
Task toTask(Job from) throws IOException;
+ K8sTaskId getTaskId(Job from);
+
+ /**
+ * Method for exposing to external classes whether the task has its task
payload bundled by the adapter or relies on a external system
+ */
+ boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException;
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 4c46c278e26..1c6e429a3dc 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -79,7 +79,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
}
@Test
- public void test_run()
+ public void test_run() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
@@ -114,7 +114,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
replayAll();
- TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
+ TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, false);
verifyAll();
@@ -124,7 +124,51 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
}
@Test
- public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
+ public void test_run_useTaskManager() throws IOException
+ {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
+ @Override
+ protected synchronized TaskStatus join(long timeout)
+ {
+ return TaskStatus.success(ID);
+ }
+ };
+
+ Job job = new
JobBuilder().withNewMetadata().withName(ID).endMetadata().build();
+
+ EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
+ EasyMock.eq(job),
+ EasyMock.eq(task),
+ EasyMock.anyLong(),
+ EasyMock.eq(TimeUnit.MILLISECONDS)
+ )).andReturn(null);
+ Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
+
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
+
+ taskLogs.pushTaskPayload(EasyMock.anyString(), EasyMock.anyObject());
+ replayAll();
+
+ TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, true);
+
+ verifyAll();
+ Assert.assertTrue(taskStatus.isSuccess());
+ Assert.assertEquals(ID, taskStatus.getId());
+ Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED,
peonLifecycle.getState());
+ }
+
+ @Test
+ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
@@ -159,12 +203,12 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
replayAll();
- peonLifecycle.run(job, 0L, 0L);
+ peonLifecycle.run(job, 0L, 0L, false);
Assert.assertThrows(
"Task [id] failed to run: invalid peon lifecycle state transition
[STOPPED]->[PENDING]",
IllegalStateException.class,
- () -> peonLifecycle.run(job, 0L, 0L)
+ () -> peonLifecycle.run(job, 0L, 0L, false)
);
verifyAll();
@@ -208,7 +252,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
Assert.assertThrows(
Exception.class,
- () -> peonLifecycle.run(job, 0L, 0L)
+ () -> peonLifecycle.run(job, 0L, 0L, false)
);
verifyAll();
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index e6b1b8006af..36a7b4cfcd9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -221,10 +221,12 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
TaskStatus taskStatus = TaskStatus.success(task.getId());
EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+
EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
EasyMock.expect(kubernetesPeonLifecycle.run(
EasyMock.eq(job),
EasyMock.anyLong(),
- EasyMock.anyLong()
+ EasyMock.anyLong(),
+ EasyMock.anyBoolean()
)).andReturn(taskStatus);
replayAll();
@@ -256,10 +258,12 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
.build();
EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+
EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
EasyMock.expect(kubernetesPeonLifecycle.run(
EasyMock.eq(job),
EasyMock.anyLong(),
- EasyMock.anyLong()
+ EasyMock.anyLong(),
+ EasyMock.anyBoolean()
)).andThrow(new IllegalStateException());
replayAll();
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 22e2311bbba..09816168588 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -129,7 +129,8 @@ public class DruidPeonClientIntegrationTest
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ null
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new
PeonCommandContext(Collections.singletonList(
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 19701e4f26e..c1100ccc29d 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -37,10 +37,13 @@ import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
@@ -52,6 +55,7 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesExecutor;
import
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
@@ -59,15 +63,23 @@ import
org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -82,6 +94,8 @@ class K8sTaskAdapterTest
private final TaskConfig taskConfig;
private final DruidNode node;
private final ObjectMapper jsonMapper;
+ private final TaskLogs taskLogs;
+
public K8sTaskAdapterTest()
{
@@ -105,6 +119,7 @@ class K8sTaskAdapterTest
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new
TaskConfigBuilder().setBaseDir("src/test/resources").build();
+ taskLogs = new NoopTaskLogs();
}
@Test
@@ -139,7 +154,9 @@ class K8sTaskAdapterTest
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
+
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.fromTask(task);
@@ -166,7 +183,8 @@ class K8sTaskAdapterTest
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
@@ -189,6 +207,169 @@ class K8sTaskAdapterTest
assertEquals(task, taskFromJob);
}
+ @Test
+ public void fromTask_dontSetTaskJSON() throws IOException
+ {
+ final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
+ TestKubernetesClient testClient = new TestKubernetesClient(client)
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T executeRequest(KubernetesExecutor<T> executor) throws
KubernetesResourceNotFoundException
+ {
+ return (T) new Pod()
+ {
+ @Override
+ public PodSpec getSpec()
+ {
+ return podSpec;
+ }
+ };
+ }
+ };
+
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("test")
+ .build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int)
DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
+ );
+ Job job = adapter.fromTask(task);
+ // TASK_JSON should not be set in env variables
+ Assertions.assertFalse(
+ job.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0).getEnv()
+ .stream().anyMatch(env ->
env.getName().equals(DruidK8sConstants.TASK_JSON_ENV))
+ );
+
+ // --taskId <TASK_ID> should be passed to the peon command args
+ Assertions.assertTrue(
+ Arrays.stream(job.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0)
+ .getArgs()
+ .get(0).split(" ")).collect(Collectors.toSet())
+ .containsAll(ImmutableList.of("--taskId", task.getId()))
+ );
+ }
+
+ @Test
+ public void toTask_useTaskPayloadManager() throws IOException
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("test")
+ .build();
+ Task taskInTaskPayloadManager = K8sTestUtils.getTask();
+ TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+
Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of(
+ new
ByteArrayInputStream(jsonMapper.writeValueAsString(taskInTaskPayloadManager).getBytes(Charset.defaultCharset()))
+ ));
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ mockTestLogs
+ );
+
+ Job job = new JobBuilder()
+ .editMetadata().withName("job").endMetadata()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().editSpec().addToContainers(new
ContainerBuilder().withName("main").build()).endSpec().endTemplate().endSpec().build();
+
+ Task taskFromJob = adapter.toTask(job);
+ assertEquals(taskInTaskPayloadManager, taskFromJob);
+ }
+
+ @Test
+ public void getTaskId()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().endTemplate().endSpec().build();
+
+ assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
+ }
+
+ @Test
+ public void getTaskId_noAnnotations()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void getTaskId_missingTaskIdAnnotation()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
@Test
void testGrabbingTheLastXmxValueFromACommand()
{
@@ -282,7 +463,8 @@ class K8sTaskAdapterTest
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
Task task = K8sTestUtils.getTask();
// no monitor in overlord, no monitor override
@@ -305,7 +487,8 @@ class K8sTaskAdapterTest
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
adapter.addEnvironmentVariables(container, context, task.toString());
EnvVar env = container.getEnv()
@@ -322,7 +505,8 @@ class K8sTaskAdapterTest
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
container.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
@@ -347,13 +531,16 @@ class K8sTaskAdapterTest
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
.build();
- SingleContainerTaskAdapter adapter =
- new SingleContainerTaskAdapter(testClient,
- config, taskConfig,
- startupLoggingConfig,
- node,
- jsonMapper
- );
+
+ SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index ac6e32d140e..45ea0873376 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -41,6 +41,8 @@ import
org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,6 +60,7 @@ class MultiContainerTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode druidNode;
private ObjectMapper jsonMapper;
+ @Mock private TaskLogs taskLogs;
@BeforeEach
public void setup()
@@ -98,7 +101,8 @@ class MultiContainerTaskAdapterTest
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
@@ -146,7 +150,8 @@ class MultiContainerTaskAdapterTest
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
@@ -191,12 +196,16 @@ class MultiContainerTaskAdapterTest
.withPrimaryContainerName("primary")
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
.build();
- MultiContainerTaskAdapter adapter = new
MultiContainerTaskAdapter(testClient,
- config,
-
taskConfig,
-
startupLoggingConfig,
-
druidNode,
-
jsonMapper);
+
+ MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper,
+ taskLogs
+ );
NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, config.getPrimaryContainerName());
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index dd4eadfea29..74dfacd1a32 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -20,30 +20,39 @@
package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
+import org.easymock.Mock;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
@@ -51,6 +60,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class PodTemplateTaskAdapterTest
{
@TempDir private Path tempDir;
@@ -59,6 +70,7 @@ public class PodTemplateTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode node;
private ObjectMapper mapper;
+ @Mock private TaskLogs taskLogs;
@BeforeEach
public void setup()
@@ -89,7 +101,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- new Properties()
+ new Properties(),
+ taskLogs
));
}
@@ -109,7 +122,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
));
}
@@ -127,7 +141,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -159,7 +174,8 @@ public class PodTemplateTaskAdapterTest
true
),
mapper,
- props
+ props,
+ taskLogs
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -185,7 +201,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
));
}
@@ -204,7 +221,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -215,7 +233,41 @@ public class PodTemplateTaskAdapterTest
}
@Test
- public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
+ public void
test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON() throws
IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.noop",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int)
DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
+ );
+
+ Job actual = adapter.fromTask(task);
+ Job expected =
K8sTestUtils.fileToResource("expectedNoopJobNoTaskJson.yaml", Job.class);
+
+ Assertions.assertEquals(actual, expected);
+ }
+
+ @Test
+ public void test_fromTask_withoutAnnotations_throwsDruidException() throws
IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -228,17 +280,91 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml",
Job.class);
- Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+ Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
+ }
+
+ @Test
+ public void test_getTaskId() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().endTemplate().endSpec().build();
+
+ assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
}
@Test
- public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws
IOException
+ public void test_getTaskId_noAnnotations() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void test_getTaskId_missingTaskIdAnnotation() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void test_toTask_withoutTaskAnnotation_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -251,7 +377,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job baseJob =
K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@@ -265,11 +392,11 @@ public class PodTemplateTaskAdapterTest
.endTemplate()
.endSpec()
.build();
- Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+ Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
}
@Test
- public void test_fromTask() throws IOException
+ public void test_toTask() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -282,7 +409,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
@@ -292,6 +420,35 @@ public class PodTemplateTaskAdapterTest
Assertions.assertEquals(expected, actual);
}
+ @Test
+ public void test_toTask_useTaskPayloadManager() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.put("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ Task expected = new NoopTask("id", null, "datasource", 0, 0,
ImmutableMap.of());
+ TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+ Mockito.when(mockTestLogs.streamTaskPayload("id")).thenReturn(Optional.of(
+ new
ByteArrayInputStream(mapper.writeValueAsString(expected).getBytes(Charset.defaultCharset()))
+ ));
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ mockTestLogs
+ );
+
+ Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
+ Task actual = adapter.toTask(job);
+ Assertions.assertEquals(expected, actual);
+ }
+
@Test
public void test_fromTask_withRealIds() throws IOException
{
@@ -307,7 +464,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = new NoopTask(
@@ -340,7 +498,8 @@ public class PodTemplateTaskAdapterTest
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = EasyMock.mock(Task.class);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index 10e129a9c2b..43a40daedc1 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -39,6 +39,8 @@ import
org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -57,6 +59,8 @@ class SingleContainerTaskAdapterTest
private DruidNode druidNode;
private ObjectMapper jsonMapper;
+ @Mock private TaskLogs taskLogs;
+
@BeforeEach
public void setup()
{
@@ -96,7 +100,8 @@ class SingleContainerTaskAdapterTest
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index 0998d592fee..2cef837f397 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index bb8f64c5e5e..cf16c49c5db 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value:
"api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
similarity index 77%
copy from
extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
copy to
extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 0998d592fee..d72d0ef37b0 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -26,7 +26,6 @@ spec:
druid.task.group.id: "id"
druid.task.datasource: "datasource"
annotations:
- task:
"H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
tls.enabled: "false"
task.id: "id"
task.type: "noop"
@@ -42,10 +41,6 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- - name: "TASK_JSON"
- valueFrom:
- fieldRef:
- fieldPath: "metadata.annotations['task']"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
index e6762af63cf..a230ac913a6 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 53112d0808d..b9fd9e42862 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -83,6 +83,21 @@ public class S3TaskLogs implements TaskLogs
return streamTaskFileWithRetry(0, taskKey);
}
+ @Override
+ public void pushTaskPayload(String taskid, File taskPayloadFile) throws
IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "task.json");
+ log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile,
taskKey);
+ pushTaskFile(taskPayloadFile, taskKey);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskPayload(String taskid) throws
IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "task.json");
+ return streamTaskFileWithRetry(0, taskKey);
+ }
+
/**
* Using the retry conditions defined in {@link S3Utils#S3RETRY}.
*/
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 7428081aa3e..011dc488845 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -35,8 +35,11 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@@ -55,6 +58,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
@@ -143,6 +147,78 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.verify(s3Client);
}
+ @Test
+ public void test_pushTaskPayload() throws IOException
+ {
+ Capture<PutObjectRequest> putObjectRequestCapture =
Capture.newInstance(CaptureType.FIRST);
+
EasyMock.expect(s3Client.putObject(EasyMock.capture(putObjectRequestCapture)))
+ .andReturn(new PutObjectResult())
+ .once();
+
+ EasyMock.replay(s3Client);
+
+ S3TaskLogsConfig config = new S3TaskLogsConfig();
+ config.setS3Bucket(TEST_BUCKET);
+ config.setS3Prefix("prefix");
+ config.setDisableAcl(true);
+
+ CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+ S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+ S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+
+ File payloadFile = tempFolder.newFile("task.json");
+ String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+ s3TaskLogs.pushTaskPayload(taskId, payloadFile);
+
+ PutObjectRequest putObjectRequest = putObjectRequestCapture.getValue();
+ Assert.assertEquals(TEST_BUCKET, putObjectRequest.getBucketName());
+ Assert.assertEquals("prefix/" + taskId + "/task.json",
putObjectRequest.getKey());
+ Assert.assertEquals(payloadFile, putObjectRequest.getFile());
+ EasyMock.verify(s3Client);
+ }
+
+ @Test
+ public void test_streamTaskPayload() throws IOException
+ {
+ String taskPayloadString = "task payload";
+
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(taskPayloadString.length());
+ EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(objectMetadata)
+ .once();
+
+ InputStream taskPayload = new
ByteArrayInputStream(taskPayloadString.getBytes(Charset.defaultCharset()));
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(taskPayload);
+ Capture<GetObjectRequest> getObjectRequestCapture =
Capture.newInstance(CaptureType.FIRST);
+
EasyMock.expect(s3Client.getObject(EasyMock.capture(getObjectRequestCapture)))
+ .andReturn(s3Object)
+ .once();
+
+ EasyMock.replay(s3Client);
+
+ S3TaskLogsConfig config = new S3TaskLogsConfig();
+ config.setS3Bucket(TEST_BUCKET);
+ config.setS3Prefix("prefix");
+ config.setDisableAcl(true);
+
+ CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+ S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+ S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+
+ String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+ Optional<InputStream> payloadResponse =
s3TaskLogs.streamTaskPayload(taskId);
+
+ GetObjectRequest getObjectRequest = getObjectRequestCapture.getValue();
+ Assert.assertEquals(TEST_BUCKET, getObjectRequest.getBucketName());
+ Assert.assertEquals("prefix/" + taskId + "/task.json",
getObjectRequest.getKey());
+ Assert.assertTrue(payloadResponse.isPresent());
+
+ Assert.assertEquals(taskPayloadString,
IOUtils.toString(payloadResponse.get(), Charset.defaultCharset()));
+ EasyMock.verify(s3Client);
+ }
+
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
index be2c62540e0..6e7df12f8d2 100644
---
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
+++
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
@@ -29,6 +29,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
+import org.apache.druid.tasklogs.TaskPayloadManager;
/**
*/
@@ -48,5 +49,6 @@ public class IndexingServiceTaskLogsModule implements Module
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
+ binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/error/InternalServerError.java
b/processing/src/main/java/org/apache/druid/error/InternalServerError.java
new file mode 100644
index 00000000000..b730acb0e3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/error/InternalServerError.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+public class InternalServerError extends DruidException.Failure
+{
+ public static DruidException exception(String errorCode, String msg,
Object... args)
+ {
+ return exception(null, errorCode, msg, args);
+ }
+ public static DruidException exception(Throwable t, String errorCode, String
msg, Object... args)
+ {
+ return DruidException.fromFailure(new InternalServerError(t, errorCode,
msg, args));
+ }
+
+ private final Throwable t;
+ private final String msg;
+ private final Object[] args;
+
+ private InternalServerError(
+ Throwable t,
+ String errorCode,
+ String msg,
+ Object... args
+ )
+ {
+ super(errorCode);
+ this.t = t;
+ this.msg = msg;
+ this.args = args;
+ }
+
+ @Override
+ public DruidException makeException(DruidException.DruidExceptionBuilder bob)
+ {
+ bob = bob.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE);
+
+ if (t == null) {
+ return bob.build(msg, args);
+ } else {
+ return bob.build(t, msg, args);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index 287b2f6fcc3..5568a5160fd 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -64,4 +64,16 @@ public class NoopTaskLogs implements TaskLogs
{
log.info("Noop: No task logs are deleted.");
}
+
+ @Override
+ public void pushTaskPayload(String taskid, File taskPayloadFile)
+ {
+ log.info("Not pushing payload for task: %s", taskid);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskPayload(String taskid)
+ {
+ return Optional.absent();
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
index ee50217c957..2756911f6a3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java
@@ -21,7 +21,8 @@ package org.apache.druid.tasklogs;
import org.apache.druid.guice.annotations.ExtensionPoint;
+
@ExtensionPoint
-public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller
+public interface TaskLogs extends TaskLogStreamer, TaskLogPusher,
TaskLogKiller, TaskPayloadManager
{
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
new file mode 100644
index 00000000000..41db8e4556b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Something that knows how to push a task payload before it is run to
somewhere
+ * a ingestion worker will be able to stream the task payload from when trying
to run the task.
+ */
+@ExtensionPoint
+public interface TaskPayloadManager
+{
+ /**
+ * Save payload so it can be retrieved later.
+ *
+ * @return inputStream for this taskPayload, if available
+ */
+ default void pushTaskPayload(String taskid, File taskPayloadFile) throws
IOException
+ {
+ throw new NotImplementedException(StringUtils.format("this
druid.indexer.logs.type [%s] does not support managing task payloads yet. You
will have to switch to using environment variables", getClass()));
+ }
+
+ /**
+ * Stream payload for a task.
+ *
+ * @return inputStream for this taskPayload, if available
+ */
+ default Optional<InputStream> streamTaskPayload(String taskid) throws
IOException
+ {
+ throw new NotImplementedException(StringUtils.format("this
druid.indexer.logs.type [%s] does not support managing task payloads yet. You
will have to switch to using environment variables", getClass()));
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java
b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java
new file mode 100644
index 00000000000..b28296b2c41
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+import org.apache.druid.matchers.DruidMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class InternalServerErrorTest
+{
+
+ @Test
+ public void testAsErrorResponse()
+ {
+ ErrorResponse errorResponse = new
ErrorResponse(InternalServerError.exception("runtimeFailure", "Internal Server
Error"));
+ final Map<String, Object> asMap = errorResponse.getAsMap();
+
+ MatcherAssert.assertThat(
+ asMap,
+ DruidMatchers.mapMatcher(
+ "error", "druidException",
+ "errorCode", "runtimeFailure",
+ "persona", "OPERATOR",
+ "category", "RUNTIME_FAILURE",
+ "errorMessage", "Internal Server Error"
+ )
+ );
+
+ ErrorResponse recomposed = ErrorResponse.fromMap(asMap);
+
+ MatcherAssert.assertThat(
+ recomposed.getUnderlyingException(),
+ new DruidExceptionMatcher(
+ DruidException.Persona.OPERATOR,
+ DruidException.Category.RUNTIME_FAILURE,
+ "runtimeFailure"
+ ).expectMessageContains("Internal Server Error")
+ );
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
index 30192932a9e..2da98dab303 100644
--- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
@@ -32,4 +32,11 @@ public class NoopTaskLogsTest
TaskLogs taskLogs = new NoopTaskLogs();
Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
}
+
+ @Test
+ public void test_streamTaskPayload() throws IOException
+ {
+ TaskLogs taskLogs = new NoopTaskLogs();
+ Assert.assertFalse(taskLogs.streamTaskPayload("id").isPresent());
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
similarity index 68%
copy from
processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
copy to
processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
index 30192932a9e..4c53061a86f 100644
--- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
+++
b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java
@@ -19,17 +19,25 @@
package org.apache.druid.tasklogs;
+import org.apache.commons.lang.NotImplementedException;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-
-public class NoopTaskLogsTest
+public class TaskPayloadManagerTest implements TaskPayloadManager
{
@Test
- public void test_streamTaskStatus() throws IOException
+ public void test_streamTaskPayload()
+ {
+ Assert.assertThrows(NotImplementedException.class,
+ () -> this.streamTaskPayload("id")
+ );
+ }
+
+ @Test
+ public void test_pushTaskPayload()
{
- TaskLogs taskLogs = new NoopTaskLogs();
- Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
+ Assert.assertThrows(NotImplementedException.class,
+ () -> this.pushTaskPayload("id", null)
+ );
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 50dc64a1e06..a18e40b74ac 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -40,6 +40,8 @@ import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import io.netty.util.SuppressForbidden;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
@@ -130,10 +132,12 @@ import
org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
+import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@@ -177,6 +181,9 @@ public class CliPeon extends GuiceRunnable
@Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments",
description = "Enable loading of broadcast segments")
public String loadBroadcastSegments = "false";
+ @Option(name = "--taskId", title = "taskId", description = "TaskId for
fetching task.json remotely")
+ public String taskId = "";
+
private static final Logger log = new Logger(CliPeon.class);
private Properties properties;
@@ -286,9 +293,15 @@ public class CliPeon extends GuiceRunnable
@Provides
@LazySingleton
- public Task readTask(@Json ObjectMapper mapper,
ExecutorLifecycleConfig config)
+ public Task readTask(@Json ObjectMapper mapper,
ExecutorLifecycleConfig config, TaskPayloadManager taskPayloadManager)
{
try {
+ if (!config.getTaskFile().exists() ||
config.getTaskFile().length() == 0) {
+ log.info("Task file not found, trying to pull task payload
from deep storage");
+ String task =
IOUtils.toString(taskPayloadManager.streamTaskPayload(taskId).get(),
Charset.defaultCharset());
+ // write the remote task.json to the task file location for
ExecutorLifecycle to pickup
+ FileUtils.write(config.getTaskFile(), task,
Charset.defaultCharset());
+ }
return mapper.readValue(config.getTaskFile(), Task.class);
}
catch (IOException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]