This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7409070f945 feat: podTemplateSelectionKey context for picking pod
templates from task specs (#19419)
7409070f945 is described below
commit 7409070f9457478f8ee51805a0f4043d14cf31d5
Author: Armanit Garg <[email protected]>
AuthorDate: Fri May 8 04:22:55 2026 +0530
feat: podTemplateSelectionKey context for picking pod templates from task
specs (#19419)
* feat: podTemplateSelectionKey context key for picking pod templates from
task specs
* runtime prop to allow overrides
* add some logging
---
docs/development/extensions-core/k8s-jobs.md | 10 ++
.../k8s/overlord/KubernetesTaskRunnerConfig.java | 16 +++-
.../KubernetesTaskRunnerEffectiveConfig.java | 6 ++
.../overlord/KubernetesTaskRunnerStaticConfig.java | 17 +++-
.../k8s/overlord/common/DruidK8sConstants.java | 1 +
.../DynamicConfigPodTemplateSelector.java | 23 +++++
.../DynamicConfigPodTemplateSelectorTest.java | 106 ++++++++++++++++++++-
7 files changed, 176 insertions(+), 3 deletions(-)
diff --git a/docs/development/extensions-core/k8s-jobs.md
b/docs/development/extensions-core/k8s-jobs.md
index 5ebd97c04ee..d6e8265a47e 100644
--- a/docs/development/extensions-core/k8s-jobs.md
+++ b/docs/development/extensions-core/k8s-jobs.md
@@ -763,6 +763,16 @@ All three examples below are equivalent.
In all the above cases, Druid will match the selector to any value of task
type. Druid applies similar logic for `dataSource`. For `context.tags` setting
`null` or an empty object `{}` is equivalent.
+##### Override pod template via context
+
+Set the `podTemplateSelectionKey` key in a task's context to pick a configured
pod template directly, bypassing the selection strategy. The value is the same
`selectionKey` used by `selectorBased` strategy (i.e. the suffix of
`druid.indexer.runner.k8s.podTemplate.<selectionKey>`).
+
+```json
+"context": { "podTemplateSelectionKey": "podSpec1" }
+```
+
+This is gated by the runtime property
`druid.indexer.runner.allowTaskPodTemplateSelection`, which defaults to
`false`. If the key doesn't match any configured template, the task fails to
launch.
+
#### Running Task Pods in Another Namespace
It is possible to run task pods in a different namespace from the rest of your
Druid cluster.
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 0e96e6ab2c3..99bb0a3bd8a 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -91,6 +91,12 @@ public interface KubernetesTaskRunnerConfig
*/
Period getK8sSharedInformerResyncPeriod();
+ /**
+ * Whether tasks may select a configured pod template via the {@code
DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY}
+ * task context key, overriding the configured {@code
PodTemplateSelectStrategy}.
+ */
+ boolean isAllowTaskPodTemplateSelection();
+
static Builder builder()
{
return new Builder();
@@ -121,6 +127,7 @@ public interface KubernetesTaskRunnerConfig
private Period logSaveTimeout;
private boolean useK8sSharedInformers;
private Period k8sSharedInformerResyncPeriod;
+ private boolean allowTaskPodTemplateSelection;
public Builder()
{
@@ -265,6 +272,12 @@ public interface KubernetesTaskRunnerConfig
return this;
}
+ public Builder withAllowTaskPodTemplateSelection(boolean
allowTaskPodTemplateSelection)
+ {
+ this.allowTaskPodTemplateSelection = allowTaskPodTemplateSelection;
+ return this;
+ }
+
public KubernetesTaskRunnerStaticConfig build()
{
return new KubernetesTaskRunnerStaticConfig(
@@ -290,7 +303,8 @@ public interface KubernetesTaskRunnerConfig
this.capacity,
this.taskJoinTimeout,
this.useK8sSharedInformers,
- this.k8sSharedInformerResyncPeriod
+ this.k8sSharedInformerResyncPeriod,
+ this.allowTaskPodTemplateSelection
);
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
index 8343ebe1587..227b78d7e84 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
@@ -189,6 +189,12 @@ public class KubernetesTaskRunnerEffectiveConfig
implements KubernetesTaskRunner
return staticConfig.getK8sSharedInformerResyncPeriod();
}
+ @Override
+ public boolean isAllowTaskPodTemplateSelection()
+ {
+ return staticConfig.isAllowTaskPodTemplateSelection();
+ }
+
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null
|| dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
index 4f89bfae09b..c6cbc685438 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
@@ -152,6 +152,10 @@ public class KubernetesTaskRunnerStaticConfig implements
KubernetesTaskRunnerCon
@JsonProperty
private Period k8sSharedInformerResyncPeriod = new Period("PT5M");
+ @JsonProperty
+ // Allow per-task pod template selection via the task context.
+ private boolean allowTaskPodTemplateSelection = false;
+
public KubernetesTaskRunnerStaticConfig()
{
}
@@ -179,7 +183,8 @@ public class KubernetesTaskRunnerStaticConfig implements
KubernetesTaskRunnerCon
Integer capacity,
Period taskJoinTimeout,
boolean useK8sSharedInformers,
- Period k8sSharedInformerResyncPeriod
+ Period k8sSharedInformerResyncPeriod,
+ boolean allowTaskPodTemplateSelection
)
{
this.namespace = namespace;
@@ -265,6 +270,10 @@ public class KubernetesTaskRunnerStaticConfig implements
KubernetesTaskRunnerCon
k8sSharedInformerResyncPeriod,
this.k8sSharedInformerResyncPeriod
);
+ this.allowTaskPodTemplateSelection = ObjectUtils.getIfNull(
+ allowTaskPodTemplateSelection,
+ this.allowTaskPodTemplateSelection
+ );
}
@Override
@@ -406,4 +415,10 @@ public class KubernetesTaskRunnerStaticConfig implements
KubernetesTaskRunnerCon
{
return k8sSharedInformerResyncPeriod;
}
+
+ @Override
+ public boolean isAllowTaskPodTemplateSelection()
+ {
+ return allowTaskPodTemplateSelection;
+ }
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index b1d8ac262b1..f0eba8c63e6 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -46,6 +46,7 @@ public class DruidK8sConstants
public static final String OVERLORD_NAMESPACE_KEY =
"druid.overlord.namespace";
public static final String DRUID_LABEL_PREFIX = "druid.";
public static final String BASE_TEMPLATE_NAME = "base";
+ public static final String TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY =
"podTemplateSelectionKey";
public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
public static final ImmutableList<String>
BLACKLISTED_PEON_POD_ERROR_MESSAGES = ImmutableList.of(
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
index ff4a4c2cd72..735eaf1dc5d 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
@@ -26,7 +26,9 @@ import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import java.io.File;
import java.nio.file.Files;
@@ -37,6 +39,7 @@ import java.util.Set;
public class DynamicConfigPodTemplateSelector implements PodTemplateSelector
{
+ private static final Logger log = new
Logger(DynamicConfigPodTemplateSelector.class);
private static final String TASK_PROPERTY =
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+ ".k8s.podTemplate.";
@@ -119,6 +122,26 @@ public class DynamicConfigPodTemplateSelector implements
PodTemplateSelector
@Override
public Optional<PodTemplateWithName> getPodTemplateForTask(Task task)
{
+ String requested =
task.getContextValue(DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY);
+
+ if (requested != null &&
effectiveConfig.isAllowTaskPodTemplateSelection()) {
+ Supplier<PodTemplate> supplier = podTemplates.get(requested);
+ if (supplier == null) {
+ throw new IAE(
+ "Task [%s] requested pod template [%s] via context key, but no
such template is configured.",
+ task.getId(), requested
+ );
+ }
+ log.debug("Pod template [%s] selected for task [%s] via context
override.", requested, task.getId());
+ return Optional.of(new PodTemplateWithName(requested, supplier.get()));
+ } else if (requested != null) {
+ log.warn(
+ "Task [%s] set context key [%s] but pod template override is
disabled; ignoring.",
+ task.getId(),
+ DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY
+ );
+ }
+
return
Optional.of(effectiveConfig.getPodTemplateSelectStrategy().getPodTemplateForTask(task,
podTemplates));
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
index a2efc5c88fc..fbd0691efda 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.PodTemplateBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
@@ -32,6 +33,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
@@ -55,6 +57,7 @@ public class DynamicConfigPodTemplateSelectorTest
private Path tempDir;
private ObjectMapper mapper;
private PodTemplate podTemplateSpec;
+ private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
private KubernetesTaskRunnerEffectiveConfig effectiveConfig;
@BeforeEach
@@ -62,7 +65,7 @@ public class DynamicConfigPodTemplateSelectorTest
{
mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml",
PodTemplate.class);
- Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY,
1);
+ dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY,
1);
KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder().build();
effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
}
@@ -343,4 +346,105 @@ public class DynamicConfigPodTemplateSelectorTest
Assertions.assertEquals("base", actual.get().getName());
Assertions.assertEquals(expected, actual.get().getPodTemplate());
}
+
+ @Test
+ public void
test_fromTask_contextPodTemplateSelectionKey_returnsRequestedTemplate() throws
IOException
+ {
+ Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+ Path lowThroughputTemplatePath =
Files.createFile(tempDir.resolve("low-throughput.yaml"));
+ PodTemplate lowThroughputPodTemplate = new
PodTemplateBuilder(podTemplateSpec)
+ .editTemplate()
+ .editSpec()
+ .setNewVolumeLike(0, new VolumeBuilder().withName("volume").build())
+ .endVolume()
+ .endSpec()
+ .endTemplate()
+ .build();
+ mapper.writeValue(lowThroughputTemplatePath.toFile(),
lowThroughputPodTemplate);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
baseTemplatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput",
lowThroughputTemplatePath.toString());
+
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder()
+ .withAllowTaskPodTemplateSelection(true)
+ .build();
+ effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
+
+ DynamicConfigPodTemplateSelector selector = new
DynamicConfigPodTemplateSelector(props, effectiveConfig);
+
+ Task task = new NoopTask(
+ "id", "id", "datasource", 0, 0,
+
ImmutableMap.of(DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY,
"lowThroughput")
+ );
+ Optional<PodTemplateWithName> actual =
selector.getPodTemplateForTask(task);
+
+ Assertions.assertTrue(actual.isPresent());
+ Assertions.assertEquals("lowThroughput", actual.get().getName());
+ Assertions.assertEquals(1,
actual.get().getPodTemplate().getTemplate().getSpec().getVolumes().size());
+ }
+
+ @Test
+ public void
test_fromTask_contextPodTemplateSelectionKey_unknownName_throwsIAE() throws
IOException
+ {
+ Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
baseTemplatePath.toString());
+
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder()
+ .withAllowTaskPodTemplateSelection(true)
+ .build();
+ effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
+
+ DynamicConfigPodTemplateSelector selector = new
DynamicConfigPodTemplateSelector(props, effectiveConfig);
+
+ Task task = new NoopTask(
+ "id", "id", "datasource", 0, 0,
+
ImmutableMap.of(DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY,
"doesNotExist")
+ );
+
+ Exception exception = Assertions.assertThrows(
+ IAE.class,
+ () -> selector.getPodTemplateForTask(task)
+ );
+ Assertions.assertTrue(exception.getMessage().contains("[id]"));
+ Assertions.assertTrue(exception.getMessage().contains("[doesNotExist]"));
+ }
+
+ @Test
+ public void
test_fromTask_contextPodTemplateSelectionKey_disabledByDefault_isIgnored()
throws IOException
+ {
+ Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+ Path lowThroughputTemplatePath =
Files.createFile(tempDir.resolve("low-throughput.yaml"));
+ PodTemplate lowThroughputPodTemplate = new
PodTemplateBuilder(podTemplateSpec)
+ .editTemplate()
+ .editSpec()
+ .setNewVolumeLike(0, new VolumeBuilder().withName("volume").build())
+ .endVolume()
+ .endSpec()
+ .endTemplate()
+ .build();
+ mapper.writeValue(lowThroughputTemplatePath.toFile(),
lowThroughputPodTemplate);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
baseTemplatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput",
lowThroughputTemplatePath.toString());
+
+ DynamicConfigPodTemplateSelector selector = new
DynamicConfigPodTemplateSelector(props, effectiveConfig);
+
+ Task task = new NoopTask(
+ "id", "id", "datasource", 0, 0,
+
ImmutableMap.of(DruidK8sConstants.TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY,
"lowThroughput")
+ );
+ Optional<PodTemplateWithName> actual =
selector.getPodTemplateForTask(task);
+
+ Assertions.assertTrue(actual.isPresent());
+ Assertions.assertEquals("base", actual.get().getName());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]