This is an automated email from the ASF dual-hosted git repository.
davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d81d13b9ba Pod template task adapter (#13896)
d81d13b9ba is described below
commit d81d13b9ba5b81ed098230dd5fbf24f39ea5710f
Author: Nicholas Lippis <[email protected]>
AuthorDate: Wed Mar 22 16:20:24 2023 -0400
Pod template task adapter (#13896)
* Pod template task adapter
* Use getBaseTaskDirPaths
* Remove unused task from getEnv
* Use Optional.ifPresent() instead of Optional.map()
* Pass absolute path
* Don't pass task to getEnv
* Assert the correct adapter is created
* Javadocs and Comments
* Add exception message to assertions
---
.../druid/k8s/overlord/KubernetesTaskRunner.java | 88 +----
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 23 +-
.../k8s/overlord/common/DruidK8sConstants.java | 4 +
.../druid/k8s/overlord/common/K8sTaskAdapter.java | 100 +++++-
.../overlord/common/MultiContainerTaskAdapter.java | 14 +-
.../overlord/common/PodTemplateTaskAdapter.java | 259 ++++++++++++++
.../common/SingleContainerTaskAdapter.java | 12 +-
.../druid/k8s/overlord/common/TaskAdapter.java | 8 +-
.../overlord/KubernetesTaskRunnerFactoryTest.java | 146 ++++++++
.../k8s/overlord/KubernetesTaskRunnerTest.java | 75 +---
.../common/DruidPeonClientIntegrationTest.java | 55 ++-
.../k8s/overlord/common/K8sTaskAdapterTest.java | 54 ++-
.../common/MultiContainerTaskAdapterTest.java | 60 +++-
.../common/PodTemplateTaskAdapterTest.java | 395 +++++++++++++++++++++
.../common/SingleContainerTaskAdapterTest.java | 51 ++-
.../src/test/resources/basePod.yaml | 23 ++
.../src/test/resources/basePodTemplate.yaml | 12 +
.../test/resources/basePodWithoutAnnotations.yaml | 21 ++
.../src/test/resources/expectedNoopJob.yaml | 40 +++
.../test/resources/expectedNoopJobTlsEnabled.yaml | 40 +++
20 files changed, 1291 insertions(+), 189 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 44723db377..ae2dbbb885 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -35,38 +34,29 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.ForkingTaskRunner;
-import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
-import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
-import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -101,36 +91,28 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
private final ScheduledExecutorService cleanupExecutor;
protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new
ConcurrentHashMap<>();
- private final StartupLoggingConfig startupLoggingConfig;
- private final TaskAdapter<Pod, Job> adapter;
+ protected final TaskAdapter adapter;
private final KubernetesTaskRunnerConfig k8sConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
- private final DruidNode node;
- private final TaskConfig taskConfig;
public KubernetesTaskRunner(
- StartupLoggingConfig startupLoggingConfig,
- TaskAdapter<Pod, Job> adapter,
+ TaskAdapter adapter,
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
- KubernetesPeonClient client,
- DruidNode node,
- TaskConfig taskConfig
+ KubernetesPeonClient client
)
{
- this.startupLoggingConfig = startupLoggingConfig;
this.adapter = adapter;
this.k8sConfig = k8sConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.client = client;
- this.node = node;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
@@ -139,7 +121,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
"The task queue bounds how many concurrent k8s tasks you can have"
);
- this.taskConfig = taskConfig;
}
@@ -163,13 +144,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
JobResponse completedPhase;
Optional<Job> existingJob = client.jobExists(k8sTaskId);
if (!existingJob.isPresent()) {
- PeonCommandContext context = new PeonCommandContext(
- generateCommand(task),
- javaOpts(task),
- new File(taskConfig.getBaseTaskDirPaths().get(0)),
- node.isEnableTlsPort()
- );
- Job job = adapter.fromTask(task, context);
+ Job job = adapter.fromTask(task);
log.info("Job created %s and ready to launch", k8sTaskId);
Pod peonPod = client.launchJobAndWaitForStart(
job,
@@ -313,61 +288,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
return ImmutableMap.of("taskQueue", (long) taskQueueConfig.getMaxSize());
}
- private List<String> javaOpts(Task task)
- {
- final List<String> javaOpts = new ArrayList<>();
- Iterables.addAll(javaOpts, k8sConfig.javaOptsArray);
-
- // Override task specific javaOpts
- Object taskJavaOpts = task.getContextValue(
- ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
- );
- if (taskJavaOpts != null) {
- Iterables.addAll(
- javaOpts,
- new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
- );
- }
-
- javaOpts.add(StringUtils.format("-Ddruid.port=%d",
DruidK8sConstants.PORT));
- javaOpts.add(StringUtils.format("-Ddruid.plaintextPort=%d",
DruidK8sConstants.PORT));
- javaOpts.add(StringUtils.format("-Ddruid.tlsPort=%d",
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
- javaOpts.add(StringUtils.format(
- "-Ddruid.task.executor.tlsPort=%d",
- node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
- ));
- javaOpts.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s",
node.isEnableTlsPort())
- );
- return javaOpts;
- }
-
- private List<String> generateCommand(Task task)
- {
- final List<String> command = new ArrayList<>();
- command.add("/peon.sh");
- command.add(taskConfig.getBaseTaskDirPaths().get(0));
- command.add(task.getId());
- command.add("1"); // the attemptId is always 1, we never run the task
twice on the same pod.
-
- String nodeType = task.getNodeType();
- if (nodeType != null) {
- command.add("--nodeType");
- command.add(nodeType);
- }
-
- // If the task type is queryable, we need to load broadcast segments on
the peon, used for
- // join queries
- if (task.supportsQueries()) {
- command.add("--loadBroadcastSegments");
- command.add("true");
- }
- log.info(
- "Peon Command for K8s job: %s",
-
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(),
command)
- );
- return command;
- }
-
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 759752753b..d143eb6840 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -86,20 +86,31 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
K8sTaskAdapter adapter;
if (kubernetesTaskRunnerConfig.sidecarSupport) {
- adapter = new MultiContainerTaskAdapter(client,
kubernetesTaskRunnerConfig, smileMapper);
+ adapter = new MultiContainerTaskAdapter(
+ client,
+ kubernetesTaskRunnerConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper
+ );
} else {
- adapter = new SingleContainerTaskAdapter(client,
kubernetesTaskRunnerConfig, smileMapper);
+ adapter = new SingleContainerTaskAdapter(
+ client,
+ kubernetesTaskRunnerConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper
+ );
}
runner = new KubernetesTaskRunner(
- startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- new DruidKubernetesPeonClient(client,
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
- druidNode,
- taskConfig
+ new DruidKubernetesPeonClient(client,
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs)
);
return runner;
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index abdb74ee0c..a43c9b908f 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -23,7 +23,11 @@ import com.google.common.base.Predicate;
public class DruidK8sConstants
{
+ public static final String TASK = "task";
public static final String TASK_ID = "task.id";
+ public static final String TASK_TYPE = "task.type";
+ public static final String TASK_GROUP_ID = "task.group.id";
+ public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final String TLS_ENABLED = "tls.enabled";
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
index b067211399..bfc87d7c86 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
@@ -40,12 +40,20 @@ import
io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.lang3.StringUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ForkingTaskRunner;
+import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
+import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -63,33 +71,48 @@ import java.util.Optional;
* to add some extra coordination to shut down sidecar containers when the
main pod exits.
*/
-public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
+public abstract class K8sTaskAdapter implements TaskAdapter
{
private static final EmittingLogger log = new
EmittingLogger(K8sTaskAdapter.class);
protected final KubernetesClientApi client;
- protected final KubernetesTaskRunnerConfig config;
+ protected final KubernetesTaskRunnerConfig taskRunnerConfig;
+ protected final TaskConfig taskConfig;
+ protected final StartupLoggingConfig startupLoggingConfig;
+ protected final DruidNode node;
protected final ObjectMapper mapper;
public K8sTaskAdapter(
KubernetesClientApi client,
- KubernetesTaskRunnerConfig config,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode node,
ObjectMapper mapper
)
{
this.client = client;
- this.config = config;
+ this.taskRunnerConfig = taskRunnerConfig;
+ this.taskConfig = taskConfig;
+ this.startupLoggingConfig = startupLoggingConfig;
+ this.node = node;
this.mapper = mapper;
}
@Override
- public Job fromTask(Task task, PeonCommandContext context) throws IOException
+ public Job fromTask(Task task) throws IOException
{
String myPodName = System.getenv("HOSTNAME");
- Pod pod = client.executeRequest(client ->
client.pods().inNamespace(config.namespace).withName(myPodName).get());
+ Pod pod = client.executeRequest(client ->
client.pods().inNamespace(taskRunnerConfig.namespace).withName(myPodName).get());
+ PeonCommandContext context = new PeonCommandContext(
+ generateCommand(task),
+ javaOpts(task),
+ new File(taskConfig.getBaseTaskDirPaths().get(0)),
+ node.isEnableTlsPort()
+ );
PodSpec podSpec = pod.getSpec();
- massageSpec(podSpec, config.primaryContainerName);
+ massageSpec(podSpec, taskRunnerConfig.primaryContainerName);
return createJobFromPodSpec(podSpec, task, context);
}
@@ -125,9 +148,9 @@ public abstract class K8sTaskAdapter implements
TaskAdapter<Pod, Job>
.endMetadata()
.withNewSpec()
.withTemplate(podTemplate)
-
.withActiveDeadlineSeconds(config.maxTaskDuration.toStandardDuration().getStandardSeconds())
+
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
.withBackoffLimit(0)
- .withTtlSecondsAfterFinished((int)
config.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+ .withTtlSecondsAfterFinished((int)
taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
.endSpec()
.build();
}
@@ -245,7 +268,7 @@ public abstract class K8sTaskAdapter implements
TaskAdapter<Pod, Job>
protected Map<String, String> addJobSpecificAnnotations(PeonCommandContext
context, K8sTaskId k8sTaskId)
{
- Map<String, String> annotations = config.annotations;
+ Map<String, String> annotations = taskRunnerConfig.annotations;
annotations.put(DruidK8sConstants.TASK_ID, k8sTaskId.getOriginalTaskId());
annotations.put(DruidK8sConstants.TLS_ENABLED,
String.valueOf(context.isEnableTls()));
return annotations;
@@ -253,7 +276,7 @@ public abstract class K8sTaskAdapter implements
TaskAdapter<Pod, Job>
protected Map<String, String> addJobSpecificLabels()
{
- Map<String, String> labels = config.labels;
+ Map<String, String> labels = taskRunnerConfig.labels;
labels.put(DruidK8sConstants.LABEL_KEY, "true");
return labels;
}
@@ -269,7 +292,7 @@ public abstract class K8sTaskAdapter implements
TaskAdapter<Pod, Job>
podSpec.setNodeName(null);
podSpec.setRestartPolicy("Never");
podSpec.setHostname(k8sTaskId.getK8sTaskId());
-
podSpec.setTerminationGracePeriodSeconds(config.graceTerminationPeriodSeconds);
+
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.graceTerminationPeriodSeconds);
PodTemplateSpec podTemplate = new PodTemplateSpec();
ObjectMeta objectMeta = new ObjectMeta();
@@ -304,4 +327,57 @@ public abstract class K8sTaskAdapter implements
TaskAdapter<Pod, Job>
}
}
+ private List<String> javaOpts(Task task)
+ {
+ final List<String> javaOpts = new ArrayList<>();
+ Iterables.addAll(javaOpts, taskRunnerConfig.javaOptsArray);
+
+ // Override task specific javaOpts
+ Object taskJavaOpts = task.getContextValue(
+ ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+ );
+ if (taskJavaOpts != null) {
+ Iterables.addAll(
+ javaOpts,
+ new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
+ );
+ }
+
+
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d",
DruidK8sConstants.PORT));
+
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d",
DruidK8sConstants.PORT));
+
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d",
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
+ javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
+ "-Ddruid.task.executor.tlsPort=%d",
+ node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
+ ));
+
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s",
node.isEnableTlsPort())
+ );
+ return javaOpts;
+ }
+
+ private List<String> generateCommand(Task task)
+ {
+ final List<String> command = new ArrayList<>();
+ command.add("/peon.sh");
+ command.add(new
File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath());
+ command.add("1"); // the attemptId is always 1, we never run the task
twice on the same pod.
+
+ String nodeType = task.getNodeType();
+ if (nodeType != null) {
+ command.add("--nodeType");
+ command.add(nodeType);
+ }
+
+ // If the task type is queryable, we need to load broadcast segments on
the peon, used for
+ // join queries
+ if (task.supportsQueries()) {
+ command.add("--loadBroadcastSegments");
+ command.add("true");
+ }
+ log.info(
+ "Peon Command for K8s job: %s",
+
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(),
command)
+ );
+ return command;
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
index df890ede6d..f17dbaaa0a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java
@@ -34,8 +34,11 @@ import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.IOException;
import java.util.Collections;
@@ -44,13 +47,18 @@ import java.util.Map;
public class MultiContainerTaskAdapter extends K8sTaskAdapter
{
+ public static String TYPE = "MultiContainer";
+
public MultiContainerTaskAdapter(
KubernetesClientApi client,
- KubernetesTaskRunnerConfig config,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode druidNode,
ObjectMapper mapper
)
{
- super(client, config, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper);
}
@Override
@@ -87,7 +95,7 @@ public class MultiContainerTaskAdapter extends K8sTaskAdapter
{
return new ContainerBuilder()
.withName("kubexit")
- .withImage(config.kubexitImage)
+ .withImage(taskRunnerConfig.kubexitImage)
.withCommand("cp", "/bin/kubexit", "/kubexit/kubexit")
.withVolumeMounts(new
VolumeMountBuilder().withMountPath("/kubexit").withName("kubexit").build())
.build();
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
new file mode 100644
index 0000000000..0c8b8108de
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A PodTemplate {@link TaskAdapter} to transform tasks to kubernetes jobs and
kubernetes pods to tasks
+ *
+ * Pod Templates
+ * This TaskAdapter allows the user to provide a pod template per druid task.
If a pod template has
+ * not been provided for a task, then the provided base template will be used.
+ *
+ * Providing Pod Templates per Task
+ * Pod templates are provided as files, each pod template file path must be
specified as a runtime property
+ * druid.indexer.runner.k8s.podTemplate.{task_name}=/path/to/podTemplate.yaml.
+ *
+ * Note that the base pod template must be specified as the runtime property
+ * druid.indexer.runner.k8s.podTemplate.base=/path/to/podTemplate.yaml
+ */
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+ public static String TYPE = "PodTemplate";
+
+ private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+ private static final String TASK_PROPERTY =
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX +
".k8s.podTemplate.%s";
+
+ private final KubernetesClientApi client;
+ private final KubernetesTaskRunnerConfig taskRunnerConfig;
+ private final TaskConfig taskConfig;
+ private final DruidNode node;
+ private final ObjectMapper mapper;
+ private final HashMap<String, PodTemplate> templates;
+
+ public PodTemplateTaskAdapter(
+ KubernetesClientApi client,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ DruidNode node,
+ ObjectMapper mapper,
+ Properties properties
+ )
+ {
+ this.client = client;
+ this.taskRunnerConfig = taskRunnerConfig;
+ this.taskConfig = taskConfig;
+ this.node = node;
+ this.mapper = mapper;
+ this.templates = initializePodTemplates(properties);
+ }
+
+ /**
+ * Create a {@link Job} from a {@link Task}
+ *
+ * 1. Select pod template based on task type
+ * 2. Add labels and annotations to the pod template including the task as a
compressed and base64 encoded string
+ * 3. Add labels and annotations to the job
+ * 4. Add user specified active deadline seconds and job ttl
+ * 5. Set backoff limit to zero since druid does not support external
systems retrying failed tasks
+ *
+ * @param task
+ * @return {@link Job}
+ * @throws IOException
+ */
+ @Override
+ public Job fromTask(Task task) throws IOException
+ {
+ PodTemplate podTemplate = templates.getOrDefault(task.getType(),
templates.get("base"));
+ if (podTemplate == null) {
+ throw new ISE("Pod template spec not found for task type [%s]",
task.getType());
+ }
+
+ return new JobBuilder()
+ .withNewMetadata()
+ .withName(new K8sTaskId(task).getK8sTaskId())
+ .addToLabels(getJobLabels(taskRunnerConfig))
+ .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+ .endMetadata()
+ .withNewSpec()
+ .withTemplate(podTemplate.getTemplate())
+ .editTemplate()
+ .editOrNewMetadata()
+ .addToAnnotations(getPodTemplateAnnotations(task))
+ .addToLabels(getPodLabels(taskRunnerConfig))
+ .endMetadata()
+ .editSpec()
+ .editFirstContainer()
+ .addAllToEnv(getEnv())
+ .endContainer()
+ .endSpec()
+ .endTemplate()
+
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
+ .withBackoffLimit(0) // druid does not support an external system
retrying failed tasks
+ .withTtlSecondsAfterFinished((int)
taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+ .endSpec()
+ .build();
+ }
+
+ /**
+ * Transform a {@link Pod} to a {@link Task}
+ *
+ * 1. Find task annotation on the pod
+ * 2. Base 64 decode and decompress task, read into {@link Task}
+ *
+ * @param from
+ * @return {@link Task}
+ * @throws IOException
+ */
+ @Override
+ public Task toTask(Pod from) throws IOException
+ {
+ Map<String, String> annotations = from.getMetadata().getAnnotations();
+ if (annotations == null) {
+ throw new IOE("No annotations found on pod [%s]",
from.getMetadata().getName());
+ }
+ String task = annotations.get(DruidK8sConstants.TASK);
+ if (task == null) {
+ throw new IOE("No task annotation found on pod [%s]",
from.getMetadata().getName());
+ }
+ return mapper.readValue(Base64Compression.decompressBase64(task),
Task.class);
+ }
+
+ private HashMap<String, PodTemplate> initializePodTemplates(Properties
properties)
+ {
+ HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
+ Optional<PodTemplate> basePodTemplate = loadPodTemplate("base",
properties);
+ if (!basePodTemplate.isPresent()) {
+ throw new IAE("Pod template task adapter requires a base pod template to
be specified");
+ }
+ podTemplateMap.put("base", basePodTemplate.get());
+
+ MapperConfig config = mapper.getDeserializationConfig();
+ AnnotatedClass cls =
AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class);
+ Collection<NamedType> taskSubtypes =
mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls);
+ for (NamedType namedType : taskSubtypes) {
+ String taskType = namedType.getName();
+ Optional<PodTemplate> template = loadPodTemplate(taskType, properties);
+ template.ifPresent(podTemplate -> podTemplateMap.put(taskType,
podTemplate));
+ }
+ return podTemplateMap;
+ }
+
+ private Optional<PodTemplate> loadPodTemplate(String key, Properties
properties)
+ {
+ String property = StringUtils.format(TASK_PROPERTY, key);
+ String podTemplateFile = properties.getProperty(property);
+ if (podTemplateFile == null) {
+ log.debug("Pod template file not specified for [%s]", key);
+ return Optional.empty();
+ }
+ try {
+ return Optional.of(client.executeRequest(client ->
client.v1().podTemplates().load(new File(podTemplateFile)).get()));
+ }
+ catch (Exception e) {
+ throw new ISE(e, "Failed to load pod template file for [%s] at [%s]",
property, podTemplateFile);
+ }
+ }
+
+ private Collection<EnvVar> getEnv()
+ {
+ return ImmutableList.of(
+ new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_DIR_ENV)
+ .withValue(new
File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath())
+ .build(),
+ new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_JSON_ENV)
+ .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new
ObjectFieldSelector(
+ null,
+ StringUtils.format("metadata.annotations['%s']",
DruidK8sConstants.TASK)
+ )).build()).build()
+ );
+ }
+
+ private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig config)
+ {
+ return getJobLabels(config);
+ }
+
+ private Map<String, String> getPodTemplateAnnotations(Task task) throws
IOException
+ {
+ return ImmutableMap.<String, String>builder()
+ .put(DruidK8sConstants.TASK,
Base64Compression.compressBase64(mapper.writeValueAsString(task)))
+ .put(DruidK8sConstants.TLS_ENABLED,
String.valueOf(node.isEnableTlsPort()))
+ .put(DruidK8sConstants.TASK_ID, task.getId())
+ .put(DruidK8sConstants.TASK_TYPE, task.getType())
+ .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
+ .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
+ .build();
+ }
+
+ private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config)
+ {
+ return ImmutableMap.<String, String>builder()
+ .putAll(config.labels)
+ .put(DruidK8sConstants.LABEL_KEY, "true")
+ .build();
+ }
+
+ private Map<String, String> getJobAnnotations(KubernetesTaskRunnerConfig
config, Task task)
+ {
+ return ImmutableMap.<String, String>builder()
+ .putAll(config.annotations)
+ .put(DruidK8sConstants.TASK_ID, task.getId())
+ .put(DruidK8sConstants.TASK_TYPE, task.getType())
+ .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
+ .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
+ .build();
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
index c6e9d049f6..d4da2987b5 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java
@@ -23,8 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.IOException;
import java.util.Collections;
@@ -32,13 +35,18 @@ import java.util.Map;
public class SingleContainerTaskAdapter extends K8sTaskAdapter
{
+ public static String TYPE = "SingleContainer";
+
public SingleContainerTaskAdapter(
KubernetesClientApi client,
- KubernetesTaskRunnerConfig config,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode druidNode,
ObjectMapper mapper
)
{
- super(client, config, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig,
druidNode, mapper);
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
index 5033f4af12..00263fdc3b 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java
@@ -19,15 +19,17 @@
package org.apache.druid.k8s.overlord.common;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.task.Task;
import java.io.IOException;
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
{
- V fromTask(Task task, PeonCommandContext context) throws IOException;
+ Job fromTask(Task task) throws IOException;
- Task toTask(K from) throws IOException;
+ Task toTask(Pod from) throws IOException;
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
new file mode 100644
index 0000000000..4c3a45706d
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerFactoryTest
+{
+ private ObjectMapper objectMapper;
+ private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+ private StartupLoggingConfig startupLoggingConfig;
+ private TaskQueueConfig taskQueueConfig;
+ private TaskLogPusher taskLogPusher;
+ private DruidNode druidNode;
+ private TaskConfig taskConfig;
+
+ @Before
+ public void setup()
+ {
+ objectMapper = new TestUtils().getTestObjectMapper();
+ kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
+ startupLoggingConfig = new StartupLoggingConfig();
+ taskQueueConfig = new TaskQueueConfig(
+ 1,
+ null,
+ null,
+ null
+ );
+ taskLogPusher = new NoopTaskLogs();
+ druidNode = new DruidNode(
+ "test",
+ "",
+ false,
+ 0,
+ 1,
+ true,
+ false
+ );
+ taskConfig = new TaskConfig(
+ "/tmp",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("/tmp")
+ );
+ }
+
+ @Test
+ public void test_get_returnsSameKuberentesTaskRunner_asBuild()
+ {
+ KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+ objectMapper,
+ kubernetesTaskRunnerConfig,
+ startupLoggingConfig,
+ taskQueueConfig,
+ taskLogPusher,
+ druidNode,
+ taskConfig
+ );
+
+ KubernetesTaskRunner expectedRunner = factory.build();
+ KubernetesTaskRunner actualRunner = factory.get();
+
+ Assert.assertEquals(expectedRunner, actualRunner);
+ }
+
+ @Test
+ public void
test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
+ {
+ KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+ objectMapper,
+ kubernetesTaskRunnerConfig,
+ startupLoggingConfig,
+ taskQueueConfig,
+ taskLogPusher,
+ druidNode,
+ taskConfig
+ );
+
+ KubernetesTaskRunner runner = factory.build();
+
+ Assert.assertNotNull(runner);
+ Assert.assertTrue(runner.adapter instanceof SingleContainerTaskAdapter);
+ }
+
+ @Test
+ public void
test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
+ {
+ kubernetesTaskRunnerConfig.sidecarSupport = true;
+
+ KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+ objectMapper,
+ kubernetesTaskRunnerConfig,
+ startupLoggingConfig,
+ taskQueueConfig,
+ taskLogPusher,
+ druidNode,
+ taskConfig
+ );
+
+ KubernetesTaskRunner runner = factory.build();
+
+ Assert.assertNotNull(runner);
+ Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index a4d8ae6360..6914f79d13 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -53,7 +52,6 @@ import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskAdapter;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
-import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
@@ -83,16 +81,15 @@ import static org.mockito.Mockito.when;
public class KubernetesTaskRunnerTest
{
-
private TaskQueueConfig taskQueueConfig;
private StartupLoggingConfig startupLoggingConfig;
private ObjectMapper jsonMapper;
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
- private TaskConfig taskConfig;
private TaskLogPusher taskLogPusher;
- private DruidNode node;
+ private DruidNode druidNode;
- public KubernetesTaskRunnerTest()
+ @Before
+ public void setUp()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@@ -103,36 +100,14 @@ public class KubernetesTaskRunnerTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
- }
-
- @Before
- public void setUp()
- {
- taskConfig = new TaskConfig(
- "src/test/resources",
- "src/test/resources",
- null,
- null,
- null,
- false,
- null,
- null,
- null,
- false,
- false,
- null,
- null,
- false,
- null
- );
kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
kubernetesTaskRunnerConfig.namespace = "test";
kubernetesTaskRunnerConfig.javaOptsArray =
Collections.singletonList("-Xmx2g");
taskQueueConfig = new TaskQueueConfig(1, Period.millis(1),
Period.millis(1), Period.millis(1));
startupLoggingConfig = new StartupLoggingConfig();
taskLogPusher = mock(TaskLogPusher.class);
- node = mock(DruidNode.class);
- when(node.isEnableTlsPort()).thenReturn(false);
+ druidNode = mock(DruidNode.class);
+ when(druidNode.isEnableTlsPort()).thenReturn(false);
}
@Test
@@ -157,7 +132,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
- when(adapter.fromTask(eq(task), any())).thenReturn(job);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
@@ -171,14 +146,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- peonClient,
- node,
- taskConfig
+ peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@@ -213,7 +185,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
- when(adapter.fromTask(eq(task),
isA(PeonCommandContext.class))).thenReturn(job);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
@@ -228,14 +200,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- peonClient,
- node,
- taskConfig
+ peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@@ -249,7 +218,7 @@ public class KubernetesTaskRunnerTest
peonPod.getStatus().getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
- node.isEnableTlsPort()
+ druidNode.isEnableTlsPort()
);
verify(spyRunner, times(1)).updateStatus(eq(task),
eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
}
@@ -279,7 +248,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
- when(adapter.fromTask(eq(task),
isA(PeonCommandContext.class))).thenReturn(job);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
@@ -296,14 +265,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- peonClient,
- node,
- taskConfig
+ peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems =
spyRunner.getKnownTasks();
@@ -344,7 +310,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
- when(adapter.fromTask(eq(task),
isA(PeonCommandContext.class))).thenReturn(job);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
@@ -361,14 +327,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- peonClient,
- node,
- taskConfig
+ peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems =
spyRunner.getKnownTasks();
@@ -404,14 +367,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- startupLoggingConfig,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- peonClient,
- node,
- taskConfig
+ peonClient
);
RunnerTaskState state = taskRunner.getRunnerTaskState("foo");
@@ -434,14 +394,11 @@ public class KubernetesTaskRunnerTest
Period.millis(1)
);
assertThrows(IllegalArgumentException.class, () -> new
KubernetesTaskRunner(
- startupLoggingConfig,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
- mock(DruidKubernetesPeonClient.class),
- node,
- taskConfig
+ mock(DruidKubernetesPeonClient.class)
));
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
index 72acaf42b1..72cd8d41ec 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -29,10 +30,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -56,11 +61,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
// must have a kind / minikube cluster installed and the image pushed to your
repository
public class DruidPeonClientIntegrationTest
{
- private final KubernetesClientApi k8sClient;
- private final DruidKubernetesPeonClient peonClient;
- private final ObjectMapper jsonMapper;
-
- public DruidPeonClientIntegrationTest()
+ private StartupLoggingConfig startupLoggingConfig;
+ private TaskConfig taskConfig;
+ private DruidNode druidNode;
+ private KubernetesClientApi k8sClient;
+ private DruidKubernetesPeonClient peonClient;
+ private ObjectMapper jsonMapper;
+
+ @BeforeEach
+ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@@ -73,6 +82,33 @@ public class DruidPeonClientIntegrationTest
);
k8sClient = new DruidKubernetesClient();
peonClient = new DruidKubernetesPeonClient(k8sClient, "default", false);
+ druidNode = new DruidNode(
+ "test",
+ null,
+ false,
+ null,
+ null,
+ true,
+ false
+ );
+ startupLoggingConfig = new StartupLoggingConfig();
+ taskConfig = new TaskConfig(
+ "src/test/resources",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("src/test/resources")
+ );
}
@Disabled
@@ -84,7 +120,14 @@ public class DruidPeonClientIntegrationTest
Task task = K8sTestUtils.getTask();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "default";
- K8sTaskAdapter adapter = new SingleContainerTaskAdapter(k8sClient, config,
jsonMapper);
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ k8sClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper
+ );
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new
PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
index 59ef5b430c..fe9775868e 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
@@ -36,11 +37,14 @@ import
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -58,8 +62,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@EnableKubernetesMockClient(crud = true)
class K8sTaskAdapterTest
{
- KubernetesClient client;
+ private KubernetesClient client;
+ private final StartupLoggingConfig startupLoggingConfig;
+ private final TaskConfig taskConfig;
+ private final DruidNode node;
private ObjectMapper jsonMapper;
public K8sTaskAdapterTest()
@@ -73,6 +80,33 @@ class K8sTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
+ node = new DruidNode(
+ "test",
+ null,
+ false,
+ null,
+ null,
+ true,
+ false
+ );
+ startupLoggingConfig = new StartupLoggingConfig();
+ taskConfig = new TaskConfig(
+ "src/test/resources",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("src/test/resources")
+ );
}
@Test
@@ -83,7 +117,14 @@ class K8sTaskAdapterTest
config.namespace = "test";
config.annotations.put("annotation_key", "annotation_value");
config.labels.put("label_key", "label_value");
- K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient,
config, jsonMapper);
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper
+ );
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
@@ -105,7 +146,14 @@ class K8sTaskAdapterTest
TestKubernetesClient testClient = new TestKubernetesClient(client);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
- K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient,
config, jsonMapper);
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper
+ );
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
index f1c7b390de..cba4051873 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -29,11 +30,15 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -44,12 +49,14 @@ import java.util.Collections;
@EnableKubernetesMockClient(crud = true)
class MultiContainerTaskAdapterTest
{
-
- KubernetesClient client;
-
+ private KubernetesClient client;
+ private StartupLoggingConfig startupLoggingConfig;
+ private TaskConfig taskConfig;
+ private DruidNode druidNode;
private ObjectMapper jsonMapper;
- public MultiContainerTaskAdapterTest()
+ @BeforeEach
+ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@@ -60,6 +67,33 @@ class MultiContainerTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
+ druidNode = new DruidNode(
+ "test",
+ null,
+ false,
+ null,
+ null,
+ true,
+ false
+ );
+ startupLoggingConfig = new StartupLoggingConfig();
+ taskConfig = new TaskConfig(
+ "src/test/resources",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("src/test/resources")
+ );
}
@Test
@@ -69,7 +103,14 @@ class MultiContainerTaskAdapterTest
Pod pod =
client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
- MultiContainerTaskAdapter adapter = new
MultiContainerTaskAdapter(testClient, config, jsonMapper);
+ MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper
+ );
NoopTask task = NoopTask.create("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
@@ -113,7 +154,14 @@ class MultiContainerTaskAdapterTest
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.primaryContainerName = "primary";
- MultiContainerTaskAdapter adapter = new
MultiContainerTaskAdapter(testClient, config, jsonMapper);
+ MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper
+ );
NoopTask task = NoopTask.create("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary");
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
new file mode 100644
index 0000000000..7c9dad4607
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Properties;
+
+@EnableKubernetesMockClient()
+public class PodTemplateTaskAdapterTest
+{
+ @TempDir private Path tempDir;
+ private KubernetesClient client;
+ private KubernetesTaskRunnerConfig taskRunnerConfig;
+ private TestKubernetesClient testClient;
+ private PodTemplate podTemplateSpec;
+ private TaskConfig taskConfig;
+ private DruidNode node;
+ private ObjectMapper mapper;
+
+ @BeforeEach
+ public void setup()
+ {
+ taskRunnerConfig = new KubernetesTaskRunnerConfig();
+ testClient = new TestKubernetesClient(client);
+ taskConfig = new TaskConfig(
+ "/tmp",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("/tmp")
+ );
+ node = new DruidNode(
+ "test",
+ "",
+ false,
+ 0,
+ 1,
+ true,
+ false
+ );
+ mapper = new TestUtils().getTestObjectMapper();
+ podTemplateSpec = client
+ .v1()
+ .podTemplates()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("basePodTemplate.yaml")
+ )
+ .get();
+ }
+
+ @Test
+ public void
test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
+ {
+ Assert.assertThrows(
+ "Pod template task adapter requires a base pod template to be
specified",
+ IAE.class,
+ () -> new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ new Properties()
+ ));
+ }
+
+ @Test
+ public void
test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_raisesISE()
throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("empty.yaml"));
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ Assert.assertThrows(
+ "Pod template task adapter requires a base pod template to be
specified",
+ ISE.class,
+ () -> new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ ));
+ }
+
+ @Test
+ public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws
IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ );
+
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ null,
+ null,
+ null
+ );
+ Job actual = adapter.fromTask(task);
+ Job expected = client
+ .batch()
+ .v1()
+ .jobs()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("expectedNoopJob.yaml")
+ )
+ .get();
+
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void
test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled() throws
IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ new DruidNode(
+ "test",
+ "",
+ false,
+ 0,
+ 1,
+ false,
+ true
+ ),
+ mapper,
+ props
+ );
+
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ null,
+ null,
+ null
+ );
+
+ Job actual = adapter.fromTask(task);
+ Job expected = client
+ .batch()
+ .v1()
+ .jobs()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("expectedNoopJobTlsEnabled.yaml")
+ )
+ .get();
+
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void
test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_raisesISE()
throws IOException
+ {
+ Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ Path noopTemplatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+ mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
baseTemplatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.noop",
noopTemplatePath.toString());
+
+ Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ ));
+ }
+
+ @Test
+ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws
IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.noop",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ );
+
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ null,
+ null,
+ null
+ );
+
+ Job actual = adapter.fromTask(task);
+ Job expected = client
+ .batch()
+ .v1()
+ .jobs()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("expectedNoopJob.yaml")
+ )
+ .get();
+
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ );
+
+ Pod pod = client
+ .pods()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("basePodWithoutAnnotations.yaml")
+ )
+ .get();
+
+ Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
+ }
+
+ @Test
+ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws
IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.put("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ );
+
+ Pod basePod = client
+ .pods()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("basePodWithoutAnnotations.yaml")
+ )
+ .get();
+
+ Pod pod = new PodBuilder(basePod)
+ .editMetadata()
+ .addToAnnotations(Collections.emptyMap())
+ .endMetadata()
+ .build();
+
+ Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
+ }
+
+ @Test
+ public void test_fromTask() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.put("druid.indexer.runner.k8s.podTemplate.base",
templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ testClient,
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props
+ );
+
+ Pod pod = client
+ .pods()
+ .load(this.getClass()
+ .getClassLoader()
+ .getResourceAsStream("basePod.yaml")
+ )
+ .get();
+
+ Task actual = adapter.toTask(pod);
+ Task expected = NoopTask.create("id", 1);
+
+ Assertions.assertEquals(expected, actual);
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
index c442b13f6e..9005aa086a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
@@ -22,17 +22,22 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -43,12 +48,14 @@ import java.util.Collections;
@EnableKubernetesMockClient(crud = true)
class SingleContainerTaskAdapterTest
{
-
- KubernetesClient client;
-
+ private KubernetesClient client;
+ private StartupLoggingConfig startupLoggingConfig;
+ private TaskConfig taskConfig;
+ private DruidNode druidNode;
private ObjectMapper jsonMapper;
- public SingleContainerTaskAdapterTest()
+ @BeforeEach
+ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@@ -59,6 +66,33 @@ class SingleContainerTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
+ druidNode = new DruidNode(
+ "test",
+ null,
+ false,
+ null,
+ null,
+ true,
+ false
+ );
+ startupLoggingConfig = new StartupLoggingConfig();
+ taskConfig = new TaskConfig(
+ "src/test/resources",
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ ImmutableList.of("src/test/resources")
+ );
}
@Test
@@ -70,7 +104,14 @@ class SingleContainerTaskAdapterTest
.get();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
- SingleContainerTaskAdapter adapter = new
SingleContainerTaskAdapter(testClient, config, jsonMapper);
+ SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper
+ );
NoopTask task = NoopTask.create("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
new file mode 100644
index 0000000000..5c8c4f7855
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml
@@ -0,0 +1,23 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ name: "id-kmwkw"
+ labels:
+ job-name: "id"
+ druid.k8s.peons: "true"
+ annotations:
+ task:
"H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
+spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ env:
+ - name: "TASK_DIR"
+ value: "/tmp/id"
+ - name: "TASK_JSON"
+ valueFrom:
+ fieldRef:
+ fieldPath: "metadata.annotations['task']"
+ image: one
+ name: primary
\ No newline at end of file
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
new file mode 100644
index 0000000000..81e7d44154
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodTemplate.yaml
@@ -0,0 +1,12 @@
+apiVersion: v1
+kind: PodTemplate
+metadata:
+ name: test
+template:
+ spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ image: one
+ name: primary
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
new file mode 100644
index 0000000000..c56ba7c9b6
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml
@@ -0,0 +1,21 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ name: "id-kmwkw"
+ labels:
+ job-name: "id"
+ annotations:
+spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ env:
+ - name: "TASK_DIR"
+ value: "/tmp/id"
+ - name: "TASK_JSON"
+ valueFrom:
+ fieldRef:
+ fieldPath: "metadata.annotations['task']"
+ image: one
+ name: primary
\ No newline at end of file
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
new file mode 100644
index 0000000000..13b2e0922c
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -0,0 +1,40 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: "id"
+ labels:
+ druid.k8s.peons: "true"
+ annotations:
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+spec:
+ activeDeadlineSeconds: 14400
+ backoffLimit: 0
+ ttlSecondsAfterFinished: 172800
+ template:
+ metadata:
+ labels:
+ druid.k8s.peons: "true"
+ annotations:
+ task:
"H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+ tls.enabled: "false"
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+ spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ env:
+ - name: "TASK_DIR"
+ value: "/tmp"
+ - name: "TASK_JSON"
+ valueFrom:
+ fieldRef:
+ fieldPath: "metadata.annotations['task']"
+ image: one
+ name: primary
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
new file mode 100644
index 0000000000..1d05d6ad93
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -0,0 +1,40 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: "id"
+ labels:
+ druid.k8s.peons: "true"
+ annotations:
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+spec:
+ activeDeadlineSeconds: 14400
+ backoffLimit: 0
+ ttlSecondsAfterFinished: 172800
+ template:
+ metadata:
+ labels:
+ druid.k8s.peons: "true"
+ annotations:
+ task:
"H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+ tls.enabled: "true"
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+ spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ env:
+ - name: "TASK_DIR"
+ value: "/tmp"
+ - name: "TASK_JSON"
+ valueFrom:
+ fieldRef:
+ fieldPath: "metadata.annotations['task']"
+ image: one
+ name: primary
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]